1 // Copyright 2019 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H 16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H 17 18 #include "google/cloud/storage/client.h" 19 #include "google/cloud/storage/internal/tuple_filter.h" 20 #include "google/cloud/storage/object_stream.h" 21 #include "google/cloud/storage/version.h" 22 #include "google/cloud/future.h" 23 #include "google/cloud/internal/filesystem.h" 24 #include "google/cloud/status_or.h" 25 #include "absl/memory/memory.h" 26 #include "absl/types/optional.h" 27 #include <chrono> 28 #include <condition_variable> 29 #include <cstddef> 30 #include <fstream> 31 #include <functional> 32 #include <mutex> 33 #include <tuple> 34 #include <utility> 35 36 namespace google { 37 namespace cloud { 38 namespace storage { 39 inline namespace STORAGE_CLIENT_NS { 40 /** 41 * A parameter type indicating the maximum number of streams to 42 * `ParallelUploadFile`. 43 */ 44 class MaxStreams { 45 public: 46 // NOLINTNEXTLINE(google-explicit-constructor) MaxStreams(std::size_t value)47 MaxStreams(std::size_t value) : value_(value) {} value()48 std::size_t value() const { return value_; } 49 50 private: 51 std::size_t value_; 52 }; 53 54 /** 55 * A parameter type indicating the minimum stream size to `ParallelUploadFile`. 56 * 57 * If `ParallelUploadFile`, receives this option it will attempt to make sure 58 * that every shard is at least this long. This might not apply to the last 59 * shard because it will be the remainder of the division of the file. 60 */ 61 class MinStreamSize { 62 public: 63 // NOLINTNEXTLINE(google-explicit-constructor) MinStreamSize(std::uintmax_t value)64 MinStreamSize(std::uintmax_t value) : value_(value) {} value()65 std::uintmax_t value() const { return value_; } 66 67 private: 68 std::uintmax_t value_; 69 }; 70 71 namespace internal { 72 73 class ParallelUploadFileShard; 74 struct CreateParallelUploadShards; 75 76 /** 77 * Return an empty option if Tuple contains an element of type T, otherwise 78 * return the value of the first element of type T 79 */ 80 template <typename T, typename Tuple, typename Enable = void> 81 struct ExtractFirstOccurenceOfTypeImpl { operatorExtractFirstOccurenceOfTypeImpl82 absl::optional<T> operator()(Tuple const&) { return absl::optional<T>(); } 83 }; 84 85 template <typename T, typename... Options> 86 struct ExtractFirstOccurenceOfTypeImpl< 87 T, std::tuple<Options...>, 88 typename std::enable_if< 89 Among<typename std::decay<Options>::type...>::template TPred< 90 typename std::decay<T>::type>::value>::type> { 91 absl::optional<T> operator()(std::tuple<Options...> const& tuple) { 92 return std::get<0>(StaticTupleFilter<Among<T>::template TPred>(tuple)); 93 } 94 }; 95 96 template <typename T, typename Tuple> 97 absl::optional<T> ExtractFirstOccurenceOfType(Tuple const& tuple) { 98 return ExtractFirstOccurenceOfTypeImpl<T, Tuple>()(tuple); 99 } 100 101 /** 102 * An option for `PrepareParallelUpload` to associate opaque data with upload. 103 * 104 * This is used by `CreateUploadShards()` to store additional information in the 105 * parallel upload persistent state. The additional information is where each 106 * shard starts in the uploaded file. 107 */ 108 class ParallelUploadExtraPersistentState { 109 public: 110 std::string payload() && { return std::move(payload_); } 111 std::string payload() const& { return payload_; } 112 113 private: 114 friend struct CreateParallelUploadShards; 115 explicit ParallelUploadExtraPersistentState(std::string payload) 116 : payload_(std::move(payload)) {} 117 118 std::string payload_; 119 }; 120 121 class ParallelObjectWriteStreambuf; 122 123 // Type-erased function object to execute ComposeMany with most arguments 124 // bound. 125 using Composer = std::function<StatusOr<ObjectMetadata>( 126 std::vector<ComposeSourceObject> const&)>; 127 128 struct ParallelUploadPersistentState { 129 struct Stream { 130 std::string object_name; 131 std::string resumable_session_id; 132 }; 133 134 std::string ToString() const; 135 static StatusOr<ParallelUploadPersistentState> FromString( 136 std::string const& json_rep); 137 138 std::string destination_object_name; 139 std::int64_t expected_generation; 140 std::string custom_data; 141 std::vector<Stream> streams; 142 }; 143 144 // The `ObjectWriteStream`s have to hold references to the state of 145 // the parallel upload so that they can update it when finished and trigger 146 // shards composition, hence `ResumableParallelUploadState` has to be 147 // destroyed after the `ObjectWriteStream`s. 148 // `ResumableParallelUploadState` and `ObjectWriteStream`s are passed 149 // around by values, so we don't control their lifetime. In order to 150 // circumvent it, we move the state to something held by a `shared_ptr`. 151 class ParallelUploadStateImpl 152 : public std::enable_shared_from_this<ParallelUploadStateImpl> { 153 public: 154 ParallelUploadStateImpl(bool cleanup_on_failures, 155 std::string destination_object_name, 156 std::int64_t expected_generation, 157 std::shared_ptr<ScopedDeleter> deleter, 158 Composer composer); 159 ~ParallelUploadStateImpl(); 160 161 StatusOr<ObjectWriteStream> CreateStream( 162 RawClient& raw_client, ResumableUploadRequest const& request); 163 164 void AllStreamsFinished(std::unique_lock<std::mutex>& lk); 165 void StreamFinished(std::size_t stream_idx, 166 StatusOr<ResumableUploadResponse> const& response); 167 168 void StreamDestroyed(std::size_t stream_idx); 169 170 future<StatusOr<ObjectMetadata>> WaitForCompletion() const; 171 172 Status EagerCleanup(); 173 174 void Fail(Status status); 175 176 ParallelUploadPersistentState ToPersistentState() const; 177 178 std::string custom_data() const { 179 std::unique_lock<std::mutex> lk(mu_); 180 return custom_data_; 181 } 182 183 void set_custom_data(std::string custom_data) { 184 std::unique_lock<std::mutex> lk(mu_); 185 custom_data_ = std::move(custom_data); 186 } 187 188 std::string resumable_session_id() { 189 std::unique_lock<std::mutex> lk(mu_); 190 return resumable_session_id_; 191 } 192 193 void set_resumable_session_id(std::string resumable_session_id) { 194 std::unique_lock<std::mutex> lk(mu_); 195 resumable_session_id_ = std::move(resumable_session_id); 196 } 197 198 void PreventFromFinishing() { 199 std::unique_lock<std::mutex> lk(mu_); 200 ++num_unfinished_streams_; 201 } 202 203 void AllowFinishing() { 204 std::unique_lock<std::mutex> lk(mu_); 205 if (--num_unfinished_streams_ == 0) { 206 AllStreamsFinished(lk); 207 } 208 } 209 210 private: 211 struct StreamInfo { 212 std::string object_name; 213 std::string resumable_session_id; 214 absl::optional<ComposeSourceObject> composition_arg; 215 bool finished; 216 }; 217 218 mutable std::mutex mu_; 219 // Promises made via `WaitForCompletion()` 220 mutable std::vector<promise<StatusOr<ObjectMetadata>>> res_promises_; 221 // Type-erased object for deleting temporary objects. 222 std::shared_ptr<ScopedDeleter> deleter_; 223 // Type-erased function object to execute ComposeMany with most arguments 224 // bound. 225 std::function<StatusOr<ObjectMetadata>(std::vector<ComposeSourceObject>)> 226 composer_; 227 std::string destination_object_name_; 228 std::int64_t expected_generation_; 229 // Set when all streams are closed and composed but before cleanup. 230 bool finished_; 231 // Tracks how many streams are still written to. 232 std::size_t num_unfinished_streams_; 233 std::vector<StreamInfo> streams_; 234 absl::optional<StatusOr<ObjectMetadata>> res_; 235 Status cleanup_status_; 236 std::string custom_data_; 237 std::string resumable_session_id_; 238 }; 239 240 struct ComposeManyApplyHelper { 241 template <typename... Options> 242 StatusOr<ObjectMetadata> operator()(Options&&... options) const { 243 return ComposeMany(client, bucket_name, std::move(source_objects), prefix, 244 std::move(destination_object_name), true, 245 std::forward<Options>(options)...); 246 } 247 248 Client& client; 249 std::string bucket_name; 250 std::vector<ComposeSourceObject> source_objects; 251 std::string prefix; 252 std::string destination_object_name; 253 }; 254 255 class SetOptionsApplyHelper { 256 public: 257 // NOLINTNEXTLINE(google-explicit-constructor) 258 SetOptionsApplyHelper(ResumableUploadRequest& request) : request_(request) {} 259 260 template <typename... Options> 261 void operator()(Options&&... options) const { 262 request_.set_multiple_options(std::forward<Options>(options)...); 263 } 264 265 private: 266 ResumableUploadRequest& request_; 267 }; 268 269 struct ReadObjectApplyHelper { 270 template <typename... Options> 271 ObjectReadStream operator()(Options&&... options) const { 272 return client.ReadObject(bucket_name, object_name, 273 std::forward<Options>(options)...); 274 } 275 276 Client& client; 277 std::string const& bucket_name; 278 std::string const& object_name; 279 }; 280 281 struct GetObjectMetadataApplyHelper { 282 template <typename... Options> 283 StatusOr<ObjectMetadata> operator()(Options... options) const { 284 return client.GetObjectMetadata(bucket_name, object_name, 285 std::move(options)...); 286 } 287 288 Client& client; 289 std::string bucket_name; 290 std::string object_name; 291 }; 292 293 /** 294 * A class representing an individual shard of the parallel upload. 295 * 296 * In order to perform a parallel upload of a file, you should call 297 * `CreateUploadShards()` and it will return a vector of objects of this class. 298 * You should execute the `Upload()` member function on them in parallel to 299 * execute the upload. 300 * 301 * You can then obtain the status of the whole upload via `WaitForCompletion()`. 302 */ 303 class ParallelUploadFileShard { 304 public: 305 ParallelUploadFileShard(ParallelUploadFileShard const&) = delete; 306 ParallelUploadFileShard& operator=(ParallelUploadFileShard const&) = delete; 307 ParallelUploadFileShard(ParallelUploadFileShard&&) = default; 308 ParallelUploadFileShard& operator=(ParallelUploadFileShard&&) = default; 309 ~ParallelUploadFileShard(); 310 311 /** 312 * Perform the upload of this shard. 313 * 314 * This function will block until the shard is completed, or a permanent 315 * failure is encountered, or the retry policy is exhausted. 316 */ 317 Status Upload(); 318 319 /** 320 * Asynchronously wait for completion of the whole upload operation (not only 321 * this shard). 322 * 323 * @return the returned future will become satisfied once the whole upload 324 * operation finishes (i.e. `Upload()` completes on all shards); on 325 * success, it will hold the destination object's metadata 326 */ 327 future<StatusOr<ObjectMetadata>> WaitForCompletion() { 328 return state_->WaitForCompletion(); 329 } 330 331 /** 332 * Cleanup all the temporary files 333 * 334 * The destruction of the last of these objects tied to a parallel upload will 335 * cleanup of all the temporary files used in the process of that parallel 336 * upload. If the cleanup fails, it will fail silently not to crash the 337 * program. 338 * 339 * If you want to control the status of the cleanup, use this member function 340 * to do it eagerly, before destruction. 341 * 342 * It is enough to call it on one of the objects, but it is not invalid to 343 * call it on all objects. 344 */ 345 Status EagerCleanup() { return state_->EagerCleanup(); } 346 347 /** 348 * Retrieve resumable session ID to allow for potential future resume. 349 */ 350 std::string resumable_session_id() { return resumable_session_id_; } 351 352 private: 353 friend struct CreateParallelUploadShards; 354 ParallelUploadFileShard(std::shared_ptr<ParallelUploadStateImpl> state, 355 ObjectWriteStream ostream, std::string file_name, 356 std::uintmax_t offset_in_file, 357 std::uintmax_t bytes_to_upload, 358 std::size_t upload_buffer_size) 359 : state_(std::move(state)), 360 ostream_(std::move(ostream)), 361 file_name_(std::move(file_name)), 362 offset_in_file_(offset_in_file), 363 left_to_upload_(bytes_to_upload), 364 upload_buffer_size_(upload_buffer_size), 365 resumable_session_id_(state_->resumable_session_id()) {} 366 367 std::shared_ptr<ParallelUploadStateImpl> state_; 368 ObjectWriteStream ostream_; 369 std::string file_name_; 370 std::uintmax_t offset_in_file_; 371 std::uintmax_t left_to_upload_; 372 std::size_t upload_buffer_size_; 373 std::string resumable_session_id_; 374 }; 375 376 /** 377 * The state controlling uploading a GCS object via multiple parallel streams. 378 * 379 * To use this class obtain the state via `PrepareParallelUpload` and then write 380 * the data to the streams associated with each shard. Once writing is done, 381 * close or destroy the streams. 382 * 383 * When all the streams are `Close`d or destroyed, this class will join the 384 * them (via `ComposeMany`) into the destination object and set the value in 385 * `future`s returned by `WaitForCompletion`. 386 * 387 * Parallel upload will create temporary files. Upon completion of the whole 388 * operation, this class will attempt to remove them in its destructor, but if 389 * they fail, they fail silently. In order to proactively cleanup these files, 390 * one can call `EagerCleanup()`. 391 */ 392 class NonResumableParallelUploadState { 393 public: 394 template <typename... Options> 395 static StatusOr<NonResumableParallelUploadState> Create( 396 Client client, std::string const& bucket_name, 397 std::string const& object_name, std::size_t num_shards, 398 std::string const& prefix, std::tuple<Options...> options); 399 400 /** 401 * Asynchronously wait for completion of the whole upload operation. 402 * 403 * @return the returned future will have a value set to the destination object 404 * metadata when all the streams are `Close`d or destroyed. 405 */ 406 future<StatusOr<ObjectMetadata>> WaitForCompletion() const { 407 return impl_->WaitForCompletion(); 408 } 409 410 /** 411 * Cleanup all the temporary files 412 * 413 * The destruction of this object will perform cleanup of all the temporary 414 * files used in the process of the parallel upload. If the cleanup fails, it 415 * will fail silently not to crash the program. 416 * 417 * If you want to control the status of the cleanup, use this member function 418 * to do it eagerly, before destruction. 419 * 420 * @return the status of the cleanup. 421 */ 422 Status EagerCleanup() { return impl_->EagerCleanup(); } 423 424 /** 425 * The streams to write to. 426 * 427 * When the streams are `Close`d, they will be concatenated into the 428 * destination object in the same order as they appeared in this vector upon 429 * this object's creation. 430 * 431 * It is safe to destroy or `std::move()` these streams. 432 */ 433 std::vector<ObjectWriteStream>& shards() { return shards_; } 434 435 /** 436 * Fail the whole operation. 437 * 438 * If called before all streams are closed or destroyed, calling this 439 * operation will prevent composing the streams into the final destination 440 * object and return a failure via `WaitForCompletion()`. 441 * 442 * @param status the status to fail the operation with. 443 */ 444 void Fail(Status status) { return impl_->Fail(std::move(status)); } 445 446 private: 447 NonResumableParallelUploadState( 448 std::shared_ptr<ParallelUploadStateImpl> state, 449 std::vector<ObjectWriteStream> shards) 450 : impl_(std::move(state)), shards_(std::move(shards)) {} 451 452 std::shared_ptr<ParallelUploadStateImpl> impl_; 453 std::vector<ObjectWriteStream> shards_; 454 455 friend class NonResumableParallelObjectWriteStreambuf; 456 friend struct CreateParallelUploadShards; 457 }; 458 459 /** 460 * The state controlling uploading a GCS object via multiple parallel streams, 461 * allowing for resuming. 462 * 463 * To use this class obtain the state via `PrepareParallelUpload` (with 464 * `UseResumableUploadSession` option) and then write the data to the streams 465 * associated with each shard. Once writing is done, close or destroy the 466 * streams. 467 * 468 * When all the streams are `Close`d or destroyed, this class will join the 469 * them (via `ComposeMany`) into the destination object and set the value in 470 * `future`s returned by `WaitForCompletion`. 471 * 472 * Parallel upload will create temporary files. Upon successful completion of 473 * the whole operation, this class will attempt to remove them in its 474 * destructor, but if they fail, they fail silently. In order to proactively 475 * cleanup these files, one can call `EagerCleanup()`. 476 * 477 * In oder to resume an interrupted upload, provide `UseResumableUploadSession` 478 * to `PrepareParallelUpload` with value set to what `resumable_session_id()` 479 * returns. 480 */ 481 class ResumableParallelUploadState { 482 public: 483 static std::string session_id_prefix() { return "ParUpl:"; } 484 485 template <typename... Options> 486 static StatusOr<ResumableParallelUploadState> CreateNew( 487 Client client, std::string const& bucket_name, 488 std::string const& object_name, std::size_t num_shards, 489 std::string const& prefix, std::string const& extra_state, 490 std::tuple<Options...> const& options); 491 492 template <typename... Options> 493 static StatusOr<ResumableParallelUploadState> Resume( 494 Client client, std::string const& bucket_name, 495 std::string const& object_name, std::size_t num_shards, 496 std::string const& prefix, std::string const& resumable_session_id, 497 std::tuple<Options...> options); 498 499 /** 500 * Retrieve the resumable session id. 501 * 502 * This value, if passed via `UseResumableUploadSession` option indicates that 503 * an upload should be a continuation of the one which this object represents. 504 */ 505 std::string resumable_session_id() { return resumable_session_id_; } 506 507 /** 508 * Asynchronously wait for completion of the whole upload operation. 509 * 510 * @return the returned future will have a value set to the destination object 511 * metadata when all the streams are `Close`d or destroyed. 512 */ 513 future<StatusOr<ObjectMetadata>> WaitForCompletion() const { 514 return impl_->WaitForCompletion(); 515 } 516 517 /** 518 * Cleanup all the temporary files 519 * 520 * The destruction of this object will perform cleanup of all the temporary 521 * files used in the process of the parallel upload. If the cleanup fails, it 522 * will fail silently not to crash the program. 523 * 524 * If you want to control the status of the cleanup, use this member function 525 * to do it eagerly, before destruction. 526 * 527 * @return the status of the cleanup. 528 */ 529 Status EagerCleanup() { return impl_->EagerCleanup(); } 530 531 /** 532 * The streams to write to. 533 * 534 * When the streams are `Close`d, they will be concatenated into the 535 * destination object in the same order as they appeared in this vector upon 536 * this object's creation. 537 * 538 * It is safe to destroy or `std::move()` these streams. 539 */ 540 std::vector<ObjectWriteStream>& shards() { return shards_; } 541 542 /** 543 * Fail the whole operation. 544 * 545 * If called before all streams are closed or destroyed, calling this 546 * operation will prevent composing the streams into the final destination 547 * object and return a failure via `WaitForCompletion()`. 548 * 549 * @param status the status to fail the operation with. 550 */ 551 void Fail(Status status) { return impl_->Fail(std::move(status)); } 552 553 private: 554 template <typename... Options> 555 static std::shared_ptr<ScopedDeleter> CreateDeleter( 556 Client client, std::string const& bucket_name, 557 std::tuple<Options...> const& options); 558 559 template <typename... Options> 560 static Composer CreateComposer(Client client, std::string const& bucket_name, 561 std::string const& object_name, 562 std::int64_t expected_generation, 563 std::string const& prefix, 564 std::tuple<Options...> const& options); 565 566 ResumableParallelUploadState(std::string resumable_session_id, 567 std::shared_ptr<ParallelUploadStateImpl> state, 568 std::vector<ObjectWriteStream> shards) 569 : resumable_session_id_(std::move(resumable_session_id)), 570 impl_(std::move(state)), 571 shards_(std::move(shards)) {} 572 573 std::string resumable_session_id_; 574 std::shared_ptr<ParallelUploadStateImpl> impl_; 575 std::vector<ObjectWriteStream> shards_; 576 577 friend class ResumableParallelObjectWriteStreambuf; 578 friend struct CreateParallelUploadShards; 579 }; 580 581 /** 582 * Prepare a parallel upload state. 583 * 584 * The returned `NonResumableParallelUploadState` will contain streams to which 585 * data can be uploaded in parallel. 586 * 587 * @param client the client on which to perform the operation. 588 * @param bucket_name the name of the bucket that will contain the object. 589 * @param object_name the uploaded object name. 590 * @param num_shards how many streams to upload the object through. 591 * @param prefix the prefix with which temporary objects will be created. 592 * @param options a list of optional query parameters and/or request headers. 593 * Valid types for this operation include `DestinationPredefinedAcl`, 594 * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`, 595 * `KmsKeyName`, `QuotaUser`, `UserIp`, `UserProject`, `WithObjectMetadata`. 596 * 597 * @return the state of the parallel upload 598 * 599 * @par Idempotency 600 * This operation is not idempotent. While each request performed by this 601 * function is retried based on the client policies, the operation itself stops 602 * on the first request that fails. 603 */ 604 template <typename... Options, 605 typename std::enable_if< 606 NotAmong<typename std::decay<Options>::type...>::template TPred< 607 UseResumableUploadSession>::value, 608 int>::type EnableIfNotResumable = 0> 609 StatusOr<NonResumableParallelUploadState> PrepareParallelUpload( 610 Client client, std::string const& bucket_name, 611 std::string const& object_name, std::size_t num_shards, 612 std::string const& prefix, Options&&... options) { 613 return NonResumableParallelUploadState::Create( 614 std::move(client), bucket_name, object_name, num_shards, prefix, 615 StaticTupleFilter<NotAmong<ParallelUploadExtraPersistentState>::TPred>( 616 std::forward_as_tuple(std::forward<Options>(options)...))); 617 } 618 619 template <typename... Options, 620 typename std::enable_if< 621 Among<typename std::decay<Options>::type...>::template TPred< 622 UseResumableUploadSession>::value, 623 int>::type EnableIfResumable = 0> 624 StatusOr<ResumableParallelUploadState> PrepareParallelUpload( 625 Client client, std::string const& bucket_name, 626 std::string const& object_name, std::size_t num_shards, 627 std::string const& prefix, Options&&... options) { 628 auto resumable_args = 629 StaticTupleFilter<Among<UseResumableUploadSession>::TPred>( 630 std::tie(options...)); 631 static_assert(std::tuple_size<decltype(resumable_args)>::value == 1, 632 "The should be exacly one UseResumableUploadSession argument"); 633 std::string resumable_session_id = std::get<0>(resumable_args).value(); 634 auto extra_state_arg = 635 ExtractFirstOccurenceOfType<ParallelUploadExtraPersistentState>( 636 std::tie(options...)); 637 638 auto forwarded_args = 639 StaticTupleFilter<NotAmong<UseResumableUploadSession, 640 ParallelUploadExtraPersistentState>::TPred>( 641 std::forward_as_tuple(std::forward<Options>(options)...)); 642 643 if (resumable_session_id.empty()) { 644 return ResumableParallelUploadState::CreateNew( 645 std::move(client), bucket_name, object_name, num_shards, prefix, 646 extra_state_arg ? std::move(extra_state_arg).value().payload() 647 : std::string(), 648 std::move(forwarded_args)); 649 } 650 return ResumableParallelUploadState::Resume( 651 std::move(client), bucket_name, object_name, num_shards, prefix, 652 resumable_session_id, std::move(forwarded_args)); 653 } 654 655 template <typename... Options> 656 StatusOr<NonResumableParallelUploadState> 657 NonResumableParallelUploadState::Create(Client client, 658 std::string const& bucket_name, 659 std::string const& object_name, 660 std::size_t num_shards, 661 std::string const& prefix, 662 std::tuple<Options...> options) { 663 using internal::StaticTupleFilter; 664 auto delete_options = 665 StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options); 666 auto deleter = std::make_shared<ScopedDeleter>( 667 [client, bucket_name, delete_options](std::string const& object_name, 668 std::int64_t generation) mutable { 669 return google::cloud::internal::apply( 670 DeleteApplyHelper{client, std::move(bucket_name), object_name}, 671 std::tuple_cat(std::make_tuple(IfGenerationMatch(generation)), 672 std::move(delete_options))); 673 }); 674 675 auto compose_options = StaticTupleFilter< 676 Among<DestinationPredefinedAcl, EncryptionKey, IfGenerationMatch, 677 IfMetagenerationMatch, KmsKeyName, QuotaUser, UserIp, UserProject, 678 WithObjectMetadata>::TPred>(options); 679 auto composer = [client, bucket_name, object_name, compose_options, prefix]( 680 std::vector<ComposeSourceObject> const& sources) mutable { 681 return google::cloud::internal::apply( 682 ComposeManyApplyHelper{client, std::move(bucket_name), 683 std::move(sources), prefix + ".compose_many", 684 std::move(object_name)}, 685 std::move(compose_options)); 686 }; 687 688 auto lock = internal::LockPrefix(client, bucket_name, prefix, options); 689 if (!lock) { 690 return Status( 691 lock.status().code(), 692 "Failed to lock prefix for ParallelUpload: " + lock.status().message()); 693 } 694 deleter->Add(*lock); 695 696 auto internal_state = std::make_shared<ParallelUploadStateImpl>( 697 true, object_name, 0, std::move(deleter), std::move(composer)); 698 std::vector<ObjectWriteStream> streams; 699 700 auto upload_options = StaticTupleFilter< 701 Among<ContentEncoding, ContentType, DisableCrc32cChecksum, DisableMD5Hash, 702 EncryptionKey, KmsKeyName, PredefinedAcl, UserProject, 703 WithObjectMetadata>::TPred>(std::move(options)); 704 auto& raw_client = *client.raw_client_; 705 for (std::size_t i = 0; i < num_shards; ++i) { 706 ResumableUploadRequest request( 707 bucket_name, prefix + ".upload_shard_" + std::to_string(i)); 708 google::cloud::internal::apply(SetOptionsApplyHelper(request), 709 upload_options); 710 auto stream = internal_state->CreateStream(raw_client, request); 711 if (!stream) { 712 return stream.status(); 713 } 714 streams.emplace_back(*std::move(stream)); 715 } 716 return NonResumableParallelUploadState(std::move(internal_state), 717 std::move(streams)); 718 } 719 720 template <typename... Options> 721 std::shared_ptr<ScopedDeleter> ResumableParallelUploadState::CreateDeleter( 722 Client client, // NOLINT(performance-unnecessary-value-param) 723 std::string const& bucket_name, std::tuple<Options...> const& options) { 724 using internal::StaticTupleFilter; 725 auto delete_options = 726 StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options); 727 return std::make_shared<ScopedDeleter>( 728 [client, bucket_name, delete_options](std::string const& object_name, 729 std::int64_t generation) mutable { 730 return google::cloud::internal::apply( 731 DeleteApplyHelper{client, std::move(bucket_name), object_name}, 732 std::tuple_cat(std::make_tuple(IfGenerationMatch(generation)), 733 std::move(delete_options))); 734 }); 735 } 736 737 template <typename... Options> 738 Composer ResumableParallelUploadState::CreateComposer( 739 Client client, // NOLINT(performance-unnecessary-value-param) 740 std::string const& bucket_name, std::string const& object_name, 741 std::int64_t expected_generation, std::string const& prefix, 742 std::tuple<Options...> const& options) { 743 auto compose_options = std::tuple_cat( 744 StaticTupleFilter< 745 Among<DestinationPredefinedAcl, EncryptionKey, KmsKeyName, QuotaUser, 746 UserIp, UserProject, WithObjectMetadata>::TPred>(options), 747 std::make_tuple(IfGenerationMatch(expected_generation))); 748 auto get_metadata_options = StaticTupleFilter< 749 Among<DestinationPredefinedAcl, EncryptionKey, KmsKeyName, QuotaUser, 750 UserIp, UserProject, WithObjectMetadata>::TPred>(options); 751 auto composer = 752 [client, bucket_name, object_name, compose_options, get_metadata_options, 753 prefix](std::vector<ComposeSourceObject> const& sources) mutable 754 -> StatusOr<ObjectMetadata> { 755 auto res = google::cloud::internal::apply( 756 ComposeManyApplyHelper{client, bucket_name, std::move(sources), 757 prefix + ".compose_many", object_name}, 758 std::move(compose_options)); 759 if (res) { 760 return res; 761 } 762 if (res.status().code() != StatusCode::kFailedPrecondition) { 763 return res.status(); 764 } 765 // This means that the object already exists and it is not the object, which 766 // existed upon start of parallel upload. For simplicity, we assume that 767 // it's a result of a previously interrupted ComposeMany invocation. 768 return google::cloud::internal::apply( 769 GetObjectMetadataApplyHelper{client, std::move(bucket_name), 770 std::move(object_name)}, 771 std::move(get_metadata_options)); 772 }; 773 return Composer(std::move(composer)); 774 } 775 776 template <typename... Options> 777 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::CreateNew( 778 Client client, std::string const& bucket_name, 779 std::string const& object_name, std::size_t num_shards, 780 std::string const& prefix, std::string const& extra_state, 781 std::tuple<Options...> const& options) { 782 using internal::StaticTupleFilter; 783 784 auto get_object_meta_options = StaticTupleFilter< 785 Among<IfGenerationMatch, IfGenerationNotMatch, IfMetagenerationMatch, 786 IfMetagenerationNotMatch, UserProject>::TPred>(options); 787 auto object_meta = google::cloud::internal::apply( 788 GetObjectMetadataApplyHelper{client, bucket_name, object_name}, 789 std::move(get_object_meta_options)); 790 if (!object_meta && object_meta.status().code() != StatusCode::kNotFound) { 791 return object_meta.status(); 792 } 793 std::int64_t expected_generation = 794 object_meta ? object_meta->generation() : 0; 795 796 auto deleter = CreateDeleter(client, bucket_name, options); 797 auto composer = CreateComposer(client, bucket_name, object_name, 798 expected_generation, prefix, options); 799 auto internal_state = std::make_shared<ParallelUploadStateImpl>( 800 false, object_name, expected_generation, deleter, std::move(composer)); 801 internal_state->set_custom_data(std::move(extra_state)); 802 803 std::vector<ObjectWriteStream> streams; 804 805 auto upload_options = std::tuple_cat( 806 StaticTupleFilter< 807 Among<ContentEncoding, ContentType, DisableCrc32cChecksum, 808 DisableMD5Hash, EncryptionKey, KmsKeyName, PredefinedAcl, 809 UserProject, WithObjectMetadata>::TPred>(options), 810 std::make_tuple(UseResumableUploadSession(""))); 811 auto& raw_client = *client.raw_client_; 812 for (std::size_t i = 0; i < num_shards; ++i) { 813 ResumableUploadRequest request( 814 bucket_name, prefix + ".upload_shard_" + std::to_string(i)); 815 google::cloud::internal::apply(SetOptionsApplyHelper(request), 816 upload_options); 817 auto stream = internal_state->CreateStream(raw_client, request); 818 if (!stream) { 819 return stream.status(); 820 } 821 streams.emplace_back(*std::move(stream)); 822 } 823 824 auto state_object_name = prefix + ".upload_state"; 825 auto insert_options = std::tuple_cat( 826 std::make_tuple(IfGenerationMatch(0)), 827 StaticTupleFilter< 828 Among<PredefinedAcl, EncryptionKey, KmsKeyName, QuotaUser, UserIp, 829 UserProject, WithObjectMetadata>::TPred>(options)); 830 auto state_object = google::cloud::internal::apply( 831 InsertObjectApplyHelper{client, bucket_name, state_object_name, 832 internal_state->ToPersistentState().ToString()}, 833 std::move(insert_options)); 834 if (!state_object) { 835 internal_state->Fail(state_object.status()); 836 return std::move(state_object).status(); 837 } 838 std::string resumable_session_id = session_id_prefix() + state_object_name + 839 ":" + 840 std::to_string(state_object->generation()); 841 internal_state->set_resumable_session_id(resumable_session_id); 842 deleter->Add(std::move(*state_object)); 843 return ResumableParallelUploadState(std::move(resumable_session_id), 844 std::move(internal_state), 845 std::move(streams)); 846 } 847 848 StatusOr<std::pair<std::string, std::int64_t>> ParseResumableSessionId( 849 std::string const& session_id); 850 851 template <typename... Options> 852 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::Resume( 853 Client client, std::string const& bucket_name, 854 std::string const& object_name, std::size_t num_shards, 855 std::string const& prefix, std::string const& resumable_session_id, 856 std::tuple<Options...> options) { 857 using internal::StaticTupleFilter; 858 859 auto state_and_gen = ParseResumableSessionId(resumable_session_id); 860 if (!state_and_gen) { 861 return state_and_gen.status(); 862 } 863 864 auto read_options = std::tuple_cat( 865 StaticTupleFilter<Among<DisableCrc32cChecksum, DisableMD5Hash, 866 EncryptionKey, Generation, UserProject>::TPred>( 867 options), 868 std::make_tuple(IfGenerationMatch(state_and_gen->second))); 869 870 auto state_stream = google::cloud::internal::apply( 871 ReadObjectApplyHelper{client, bucket_name, state_and_gen->first}, 872 std::move(read_options)); 873 std::string state_string(std::istreambuf_iterator<char>{state_stream}, {}); 874 state_stream.Close(); 875 876 auto persistent_state = 877 ParallelUploadPersistentState::FromString(state_string); 878 if (!persistent_state) { 879 return persistent_state.status(); 880 } 881 882 if (persistent_state->destination_object_name != object_name) { 883 return Status(StatusCode::kInternal, 884 "Specified resumable session ID is doesn't match the " 885 "destination object name (" + 886 object_name + " vs " + 887 persistent_state->destination_object_name + ")"); 888 } 889 if (persistent_state->streams.size() != num_shards && num_shards != 0) { 890 return Status(StatusCode::kInternal, 891 "Specified resumable session ID is doesn't match the " 892 "previously specified number of shards (" + 893 std::to_string(num_shards) + " vs " + 894 std::to_string(persistent_state->streams.size()) + ")"); 895 } 896 897 auto deleter = CreateDeleter(client, bucket_name, options); 898 deleter->Add(state_and_gen->first, state_and_gen->second); 899 auto composer = 900 CreateComposer(client, bucket_name, object_name, 901 persistent_state->expected_generation, prefix, options); 902 auto internal_state = std::make_shared<ParallelUploadStateImpl>( 903 false, object_name, persistent_state->expected_generation, deleter, 904 std::move(composer)); 905 internal_state->set_custom_data(std::move(persistent_state->custom_data)); 906 internal_state->set_resumable_session_id(resumable_session_id); 907 // If a resumed stream is already finalized, callbacks from streams will be 908 // executed immediately. We don't want them to trigger composition before all 909 // of them are created. 910 internal_state->PreventFromFinishing(); 911 std::vector<ObjectWriteStream> streams; 912 913 auto upload_options = StaticTupleFilter< 914 Among<ContentEncoding, ContentType, DisableCrc32cChecksum, DisableMD5Hash, 915 EncryptionKey, KmsKeyName, PredefinedAcl, UserProject, 916 WithObjectMetadata>::TPred>(std::move(options)); 917 auto& raw_client = *client.raw_client_; 918 for (auto& stream_desc : persistent_state->streams) { 919 ResumableUploadRequest request(bucket_name, 920 std::move(stream_desc.object_name)); 921 google::cloud::internal::apply( 922 SetOptionsApplyHelper(request), 923 std::tuple_cat(upload_options, 924 std::make_tuple(UseResumableUploadSession( 925 std::move(stream_desc.resumable_session_id))))); 926 auto stream = internal_state->CreateStream(raw_client, request); 927 if (!stream) { 928 internal_state->AllowFinishing(); 929 return stream.status(); 930 } 931 streams.emplace_back(*std::move(stream)); 932 } 933 934 internal_state->AllowFinishing(); 935 return ResumableParallelUploadState(std::move(resumable_session_id), 936 std::move(internal_state), 937 std::move(streams)); 938 } 939 940 template <typename... Options> 941 std::vector<std::uintmax_t> ComputeParallelFileUploadSplitPoints( 942 std::uintmax_t file_size, std::tuple<Options...> const& options) { 943 auto div_ceil = [](std::uintmax_t dividend, std::uintmax_t divisor) { 944 return (dividend + divisor - 1) / divisor; 945 }; 946 // These defaults were obtained by experiments summarized in 947 // https://github.com/googleapis/google-cloud-cpp/issues/2951#issuecomment-566237128 948 MaxStreams const default_max_streams(64); 949 MinStreamSize const default_min_stream_size(32 * 1024 * 1024); 950 951 auto const min_stream_size = 952 (std::max<std::uintmax_t>)(1, ExtractFirstOccurenceOfType<MinStreamSize>( 953 options) 954 .value_or(default_min_stream_size) 955 .value()); 956 auto const max_streams = ExtractFirstOccurenceOfType<MaxStreams>(options) 957 .value_or(default_max_streams) 958 .value(); 959 960 auto const wanted_num_streams = 961 (std::max< 962 std::uintmax_t>)(1, (std::min<std::uintmax_t>)(max_streams, 963 div_ceil( 964 file_size, 965 min_stream_size))); 966 967 auto const stream_size = 968 (std::max<std::uintmax_t>)(1, div_ceil(file_size, wanted_num_streams)); 969 970 std::vector<std::uintmax_t> res; 971 for (auto split = stream_size; split < file_size; split += stream_size) { 972 res.push_back(split); 973 } 974 return res; 975 } 976 977 std::string ParallelFileUploadSplitPointsToString( 978 std::vector<std::uintmax_t> const& split_points); 979 980 StatusOr<std::vector<std::uintmax_t>> ParallelFileUploadSplitPointsFromString( 981 std::string const& s); 982 983 /** 984 * Helper functor to call `PrepareParallelUpload` via `apply`. 985 * 986 * This object holds only references to objects, hence it should not be stored. 987 * Instead, it should be used only as a transient object allowing for calling 988 * `PrepareParallelUpload` via `apply`. 989 */ 990 struct PrepareParallelUploadApplyHelper { 991 // Some gcc versions crash on using decltype for return type here. 992 template <typename... Options> 993 StatusOr<typename std::conditional< 994 Among<typename std::decay<Options>::type...>::template TPred< 995 UseResumableUploadSession>::value, 996 ResumableParallelUploadState, NonResumableParallelUploadState>::type> 997 operator()(Options&&... options) { 998 return PrepareParallelUpload(std::move(client), bucket_name, object_name, 999 num_shards, prefix, 1000 std::forward<Options>(options)...); 1001 } 1002 1003 Client client; 1004 std::string const& bucket_name; 1005 std::string const& object_name; 1006 std::size_t num_shards; 1007 std::string const& prefix; 1008 }; 1009 1010 struct CreateParallelUploadShards { 1011 /** 1012 * Prepare a parallel upload of a given file. 1013 * 1014 * The returned opaque objects reflect computed shards of the given file. Each 1015 * of them has an `Upload()` member function which will perform the upload of 1016 * that shard. You should parallelize running this function on them according 1017 * to your needs. You can affect how many shards will be created by using the 1018 * `MaxStreams` and `MinStreamSize` options. 1019 * 1020 * Any of the returned objects can be used for obtaining the metadata of the 1021 * resulting object. 1022 * 1023 * @param client the client on which to perform the operation. 1024 * @param file_name the path to the file to be uploaded 1025 * @param bucket_name the name of the bucket that will contain the object. 1026 * @param object_name the uploaded object name. 1027 * @param prefix the prefix with which temporary objects will be created. 1028 * @param options a list of optional query parameters and/or request headers. 1029 * Valid types for this operation include `DestinationPredefinedAcl`, 1030 * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`, 1031 * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`, 1032 * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`. 1033 * 1034 * @return the shards of the input file to be uploaded in parallel 1035 * 1036 * @par Idempotency 1037 * This operation is not idempotent. While each request performed by this 1038 * function is retried based on the client policies, the operation itself 1039 * stops on the first request that fails. 1040 * 1041 * @par Example 1042 * @snippet storage_object_file_transfer_samples.cc parallel upload file 1043 */ 1044 template <typename... Options> 1045 static StatusOr<std::vector<ParallelUploadFileShard>> Create( 1046 Client client, // NOLINT(performance-unnecessary-value-param) 1047 std::string file_name, std::string const& bucket_name, 1048 std::string const& object_name, std::string const& prefix, 1049 Options&&... options) { 1050 std::error_code size_err; 1051 auto file_size = google::cloud::internal::file_size(file_name, size_err); 1052 if (size_err) { 1053 return Status(StatusCode::kNotFound, size_err.message()); 1054 } 1055 1056 auto const resumable_session_id_arg = 1057 ExtractFirstOccurenceOfType<UseResumableUploadSession>( 1058 std::tie(options...)); 1059 bool const new_session = !resumable_session_id_arg || 1060 resumable_session_id_arg.value().value().empty(); 1061 auto upload_options = 1062 StaticTupleFilter<NotAmong<MaxStreams, MinStreamSize>::TPred>( 1063 std::tie(options...)); 1064 1065 std::vector<uintmax_t> file_split_points; 1066 std::size_t num_shards = 0; 1067 if (new_session) { 1068 file_split_points = 1069 ComputeParallelFileUploadSplitPoints(file_size, std::tie(options...)); 1070 num_shards = file_split_points.size() + 1; 1071 } 1072 1073 // Create the upload state. 1074 auto state = google::cloud::internal::apply( 1075 PrepareParallelUploadApplyHelper{client, bucket_name, object_name, 1076 num_shards, prefix}, 1077 std::tuple_cat( 1078 std::move(upload_options), 1079 std::make_tuple(ParallelUploadExtraPersistentState( 1080 ParallelFileUploadSplitPointsToString(file_split_points))))); 1081 if (!state) { 1082 return state.status(); 1083 } 1084 1085 if (!new_session) { 1086 // We need to recreate the split points of the file. 1087 auto maybe_split_points = 1088 ParallelFileUploadSplitPointsFromString(state->impl_->custom_data()); 1089 if (!maybe_split_points) { 1090 state->Fail(maybe_split_points.status()); 1091 return std::move(maybe_split_points).status(); 1092 } 1093 file_split_points = *std::move(maybe_split_points); 1094 } 1095 1096 // Everything ready - we've got the shared state and the files open, let's 1097 // prepare the returned objects. 1098 auto upload_buffer_size = 1099 client.raw_client()->client_options().upload_buffer_size(); 1100 1101 file_split_points.emplace_back(file_size); 1102 assert(file_split_points.size() == state->shards().size()); 1103 std::vector<ParallelUploadFileShard> res; 1104 std::uintmax_t offset = 0; 1105 std::size_t shard_idx = 0; 1106 for (auto shard_end : file_split_points) { 1107 res.emplace_back(ParallelUploadFileShard( 1108 state->impl_, std::move(state->shards()[shard_idx++]), file_name, 1109 offset, shard_end - offset, upload_buffer_size)); 1110 offset = shard_end; 1111 } 1112 #if defined(__clang__) && \ 1113 (__clang_major__ < 4 || (__clang_major__ == 3 && __clang_minor__ <= 8)) 1114 // The extra std::move() is required to workaround a Clang <= 3.8 bug, which 1115 // tries to copy the result otherwise. 1116 return std::move(res); 1117 #else 1118 return res; 1119 #endif 1120 } 1121 }; 1122 1123 /// @copydoc CreateParallelUploadShards::Create() 1124 template <typename... Options> 1125 StatusOr<std::vector<ParallelUploadFileShard>> CreateUploadShards( 1126 Client client, // NOLxxxxxxINT(performance-unnecessary-value-param) 1127 std::string file_name, std::string const& bucket_name, 1128 std::string const& object_name, std::string const& prefix, 1129 Options&&... options) { 1130 return CreateParallelUploadShards::Create( 1131 std::move(client), std::move(file_name), bucket_name, object_name, prefix, 1132 std::forward<Options>(options)...); 1133 } 1134 1135 } // namespace internal 1136 1137 /** 1138 * Perform a parallel upload of a given file. 1139 * 1140 * You can affect how many shards will be created by using the `MaxStreams` and 1141 * `MinStreamSize` options. 1142 * 1143 * @param client the client on which to perform the operation. 1144 * @param file_name the path to the file to be uploaded 1145 * @param bucket_name the name of the bucket that will contain the object. 1146 * @param object_name the uploaded object name. 1147 * @param prefix the prefix with which temporary objects will be created. 1148 * @param ignore_cleanup_failures treat failures to cleanup the temporary 1149 * objects as not fatal. 1150 * @param options a list of optional query parameters and/or request headers. 1151 * Valid types for this operation include `DestinationPredefinedAcl`, 1152 * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`, 1153 * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`, 1154 * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`. 1155 * 1156 * @return the metadata of the object created by the upload. 1157 * 1158 * @par Idempotency 1159 * This operation is not idempotent. While each request performed by this 1160 * function is retried based on the client policies, the operation itself stops 1161 * on the first request that fails. 1162 * 1163 * @par Example 1164 * @snippet storage_object_file_transfer_samples.cc parallel upload file 1165 */ 1166 template <typename... Options> 1167 StatusOr<ObjectMetadata> ParallelUploadFile( 1168 Client client, std::string file_name, std::string bucket_name, 1169 std::string object_name, std::string prefix, bool ignore_cleanup_failures, 1170 Options&&... options) { 1171 auto shards = internal::CreateParallelUploadShards::Create( 1172 std::move(client), std::move(file_name), std::move(bucket_name), 1173 std::move(object_name), std::move(prefix), 1174 std::forward<Options>(options)...); 1175 if (!shards) { 1176 return shards.status(); 1177 } 1178 1179 std::vector<std::thread> threads; 1180 threads.reserve(shards->size()); 1181 for (auto& shard : *shards) { 1182 threads.emplace_back([&shard] { 1183 // We can safely ignore the status - if something fails we'll know 1184 // when obtaining final metadata. 1185 shard.Upload(); 1186 }); 1187 } 1188 for (auto& thread : threads) { 1189 thread.join(); 1190 } 1191 auto res = (*shards)[0].WaitForCompletion().get(); 1192 auto cleanup_res = (*shards)[0].EagerCleanup(); 1193 if (!cleanup_res.ok() && !ignore_cleanup_failures) { 1194 return cleanup_res; 1195 } 1196 return res; 1197 } 1198 1199 } // namespace STORAGE_CLIENT_NS 1200 } // namespace storage 1201 } // namespace cloud 1202 } // namespace google 1203 1204 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H 1205