1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
17 
18 #include "google/cloud/storage/client.h"
19 #include "google/cloud/storage/internal/tuple_filter.h"
20 #include "google/cloud/storage/object_stream.h"
21 #include "google/cloud/storage/version.h"
22 #include "google/cloud/future.h"
23 #include "google/cloud/internal/filesystem.h"
24 #include "google/cloud/status_or.h"
25 #include "absl/memory/memory.h"
26 #include "absl/types/optional.h"
27 #include <chrono>
28 #include <condition_variable>
29 #include <cstddef>
30 #include <fstream>
31 #include <functional>
32 #include <mutex>
33 #include <tuple>
34 #include <utility>
35 
36 namespace google {
37 namespace cloud {
38 namespace storage {
39 inline namespace STORAGE_CLIENT_NS {
40 /**
41  * A parameter type indicating the maximum number of streams to
42  * `ParallelUploadFile`.
43  */
44 class MaxStreams {
45  public:
46   // NOLINTNEXTLINE(google-explicit-constructor)
MaxStreams(std::size_t value)47   MaxStreams(std::size_t value) : value_(value) {}
value()48   std::size_t value() const { return value_; }
49 
50  private:
51   std::size_t value_;
52 };
53 
54 /**
55  * A parameter type indicating the minimum stream size to `ParallelUploadFile`.
56  *
57  * If `ParallelUploadFile`, receives this option it will attempt to make sure
58  * that every shard is at least this long. This might not apply to the last
59  * shard because it will be the remainder of the division of the file.
60  */
61 class MinStreamSize {
62  public:
63   // NOLINTNEXTLINE(google-explicit-constructor)
MinStreamSize(std::uintmax_t value)64   MinStreamSize(std::uintmax_t value) : value_(value) {}
value()65   std::uintmax_t value() const { return value_; }
66 
67  private:
68   std::uintmax_t value_;
69 };
70 
71 namespace internal {
72 
73 class ParallelUploadFileShard;
74 struct CreateParallelUploadShards;
75 
76 /**
77  * Return an empty option if Tuple contains an element of type T, otherwise
78  *     return the value of the first element of type T
79  */
80 template <typename T, typename Tuple, typename Enable = void>
81 struct ExtractFirstOccurenceOfTypeImpl {
operatorExtractFirstOccurenceOfTypeImpl82   absl::optional<T> operator()(Tuple const&) { return absl::optional<T>(); }
83 };
84 
85 template <typename T, typename... Options>
86 struct ExtractFirstOccurenceOfTypeImpl<
87     T, std::tuple<Options...>,
88     typename std::enable_if<
89         Among<typename std::decay<Options>::type...>::template TPred<
90             typename std::decay<T>::type>::value>::type> {
91   absl::optional<T> operator()(std::tuple<Options...> const& tuple) {
92     return std::get<0>(StaticTupleFilter<Among<T>::template TPred>(tuple));
93   }
94 };
95 
96 template <typename T, typename Tuple>
97 absl::optional<T> ExtractFirstOccurenceOfType(Tuple const& tuple) {
98   return ExtractFirstOccurenceOfTypeImpl<T, Tuple>()(tuple);
99 }
100 
101 /**
102  * An option for `PrepareParallelUpload` to associate opaque data with upload.
103  *
104  * This is used by `CreateUploadShards()` to store additional information in the
105  * parallel upload persistent state. The additional information is where each
106  * shard starts in the uploaded file.
107  */
108 class ParallelUploadExtraPersistentState {
109  public:
110   std::string payload() && { return std::move(payload_); }
111   std::string payload() const& { return payload_; }
112 
113  private:
114   friend struct CreateParallelUploadShards;
115   explicit ParallelUploadExtraPersistentState(std::string payload)
116       : payload_(std::move(payload)) {}
117 
118   std::string payload_;
119 };
120 
121 class ParallelObjectWriteStreambuf;
122 
123 // Type-erased function object to execute ComposeMany with most arguments
124 // bound.
125 using Composer = std::function<StatusOr<ObjectMetadata>(
126     std::vector<ComposeSourceObject> const&)>;
127 
128 struct ParallelUploadPersistentState {
129   struct Stream {
130     std::string object_name;
131     std::string resumable_session_id;
132   };
133 
134   std::string ToString() const;
135   static StatusOr<ParallelUploadPersistentState> FromString(
136       std::string const& json_rep);
137 
138   std::string destination_object_name;
139   std::int64_t expected_generation;
140   std::string custom_data;
141   std::vector<Stream> streams;
142 };
143 
144 // The `ObjectWriteStream`s have to hold references to the state of
145 // the parallel upload so that they can update it when finished and trigger
146 // shards composition, hence `ResumableParallelUploadState` has to be
147 // destroyed after the `ObjectWriteStream`s.
148 // `ResumableParallelUploadState` and `ObjectWriteStream`s are passed
149 // around by values, so we don't control their lifetime. In order to
150 // circumvent it, we move the state to something held by a `shared_ptr`.
151 class ParallelUploadStateImpl
152     : public std::enable_shared_from_this<ParallelUploadStateImpl> {
153  public:
154   ParallelUploadStateImpl(bool cleanup_on_failures,
155                           std::string destination_object_name,
156                           std::int64_t expected_generation,
157                           std::shared_ptr<ScopedDeleter> deleter,
158                           Composer composer);
159   ~ParallelUploadStateImpl();
160 
161   StatusOr<ObjectWriteStream> CreateStream(
162       RawClient& raw_client, ResumableUploadRequest const& request);
163 
164   void AllStreamsFinished(std::unique_lock<std::mutex>& lk);
165   void StreamFinished(std::size_t stream_idx,
166                       StatusOr<ResumableUploadResponse> const& response);
167 
168   void StreamDestroyed(std::size_t stream_idx);
169 
170   future<StatusOr<ObjectMetadata>> WaitForCompletion() const;
171 
172   Status EagerCleanup();
173 
174   void Fail(Status status);
175 
176   ParallelUploadPersistentState ToPersistentState() const;
177 
178   std::string custom_data() const {
179     std::unique_lock<std::mutex> lk(mu_);
180     return custom_data_;
181   }
182 
183   void set_custom_data(std::string custom_data) {
184     std::unique_lock<std::mutex> lk(mu_);
185     custom_data_ = std::move(custom_data);
186   }
187 
188   std::string resumable_session_id() {
189     std::unique_lock<std::mutex> lk(mu_);
190     return resumable_session_id_;
191   }
192 
193   void set_resumable_session_id(std::string resumable_session_id) {
194     std::unique_lock<std::mutex> lk(mu_);
195     resumable_session_id_ = std::move(resumable_session_id);
196   }
197 
198   void PreventFromFinishing() {
199     std::unique_lock<std::mutex> lk(mu_);
200     ++num_unfinished_streams_;
201   }
202 
203   void AllowFinishing() {
204     std::unique_lock<std::mutex> lk(mu_);
205     if (--num_unfinished_streams_ == 0) {
206       AllStreamsFinished(lk);
207     }
208   }
209 
210  private:
211   struct StreamInfo {
212     std::string object_name;
213     std::string resumable_session_id;
214     absl::optional<ComposeSourceObject> composition_arg;
215     bool finished;
216   };
217 
218   mutable std::mutex mu_;
219   // Promises made via `WaitForCompletion()`
220   mutable std::vector<promise<StatusOr<ObjectMetadata>>> res_promises_;
221   // Type-erased object for deleting temporary objects.
222   std::shared_ptr<ScopedDeleter> deleter_;
223   // Type-erased function object to execute ComposeMany with most arguments
224   // bound.
225   std::function<StatusOr<ObjectMetadata>(std::vector<ComposeSourceObject>)>
226       composer_;
227   std::string destination_object_name_;
228   std::int64_t expected_generation_;
229   // Set when all streams are closed and composed but before cleanup.
230   bool finished_;
231   // Tracks how many streams are still written to.
232   std::size_t num_unfinished_streams_;
233   std::vector<StreamInfo> streams_;
234   absl::optional<StatusOr<ObjectMetadata>> res_;
235   Status cleanup_status_;
236   std::string custom_data_;
237   std::string resumable_session_id_;
238 };
239 
240 struct ComposeManyApplyHelper {
241   template <typename... Options>
242   StatusOr<ObjectMetadata> operator()(Options&&... options) const {
243     return ComposeMany(client, bucket_name, std::move(source_objects), prefix,
244                        std::move(destination_object_name), true,
245                        std::forward<Options>(options)...);
246   }
247 
248   Client& client;
249   std::string bucket_name;
250   std::vector<ComposeSourceObject> source_objects;
251   std::string prefix;
252   std::string destination_object_name;
253 };
254 
255 class SetOptionsApplyHelper {
256  public:
257   // NOLINTNEXTLINE(google-explicit-constructor)
258   SetOptionsApplyHelper(ResumableUploadRequest& request) : request_(request) {}
259 
260   template <typename... Options>
261   void operator()(Options&&... options) const {
262     request_.set_multiple_options(std::forward<Options>(options)...);
263   }
264 
265  private:
266   ResumableUploadRequest& request_;
267 };
268 
269 struct ReadObjectApplyHelper {
270   template <typename... Options>
271   ObjectReadStream operator()(Options&&... options) const {
272     return client.ReadObject(bucket_name, object_name,
273                              std::forward<Options>(options)...);
274   }
275 
276   Client& client;
277   std::string const& bucket_name;
278   std::string const& object_name;
279 };
280 
281 struct GetObjectMetadataApplyHelper {
282   template <typename... Options>
283   StatusOr<ObjectMetadata> operator()(Options... options) const {
284     return client.GetObjectMetadata(bucket_name, object_name,
285                                     std::move(options)...);
286   }
287 
288   Client& client;
289   std::string bucket_name;
290   std::string object_name;
291 };
292 
293 /**
294  * A class representing an individual shard of the parallel upload.
295  *
296  * In order to perform a parallel upload of a file, you should call
297  * `CreateUploadShards()` and it will return a vector of objects of this class.
298  * You should execute the `Upload()` member function on them in parallel to
299  * execute the upload.
300  *
301  * You can then obtain the status of the whole upload via `WaitForCompletion()`.
302  */
303 class ParallelUploadFileShard {
304  public:
305   ParallelUploadFileShard(ParallelUploadFileShard const&) = delete;
306   ParallelUploadFileShard& operator=(ParallelUploadFileShard const&) = delete;
307   ParallelUploadFileShard(ParallelUploadFileShard&&) = default;
308   ParallelUploadFileShard& operator=(ParallelUploadFileShard&&) = default;
309   ~ParallelUploadFileShard();
310 
311   /**
312    * Perform the upload of this shard.
313    *
314    * This function will block until the shard is completed, or a permanent
315    *     failure is encountered, or the retry policy is exhausted.
316    */
317   Status Upload();
318 
319   /**
320    * Asynchronously wait for completion of the whole upload operation (not only
321    *     this shard).
322    *
323    * @return the returned future will become satisfied once the whole upload
324    *     operation finishes (i.e. `Upload()` completes on all shards); on
325    *     success, it will hold the destination object's metadata
326    */
327   future<StatusOr<ObjectMetadata>> WaitForCompletion() {
328     return state_->WaitForCompletion();
329   }
330 
331   /**
332    * Cleanup all the temporary files
333    *
334    * The destruction of the last of these objects tied to a parallel upload will
335    * cleanup of all the temporary files used in the process of that parallel
336    * upload. If the cleanup fails, it will fail silently not to crash the
337    * program.
338    *
339    * If you want to control the status of the cleanup, use this member function
340    * to do it eagerly, before destruction.
341    *
342    * It is enough to call it on one of the objects, but it is not invalid to
343    * call it on all objects.
344    */
345   Status EagerCleanup() { return state_->EagerCleanup(); }
346 
347   /**
348    * Retrieve resumable session ID to allow for potential future resume.
349    */
350   std::string resumable_session_id() { return resumable_session_id_; }
351 
352  private:
353   friend struct CreateParallelUploadShards;
354   ParallelUploadFileShard(std::shared_ptr<ParallelUploadStateImpl> state,
355                           ObjectWriteStream ostream, std::string file_name,
356                           std::uintmax_t offset_in_file,
357                           std::uintmax_t bytes_to_upload,
358                           std::size_t upload_buffer_size)
359       : state_(std::move(state)),
360         ostream_(std::move(ostream)),
361         file_name_(std::move(file_name)),
362         offset_in_file_(offset_in_file),
363         left_to_upload_(bytes_to_upload),
364         upload_buffer_size_(upload_buffer_size),
365         resumable_session_id_(state_->resumable_session_id()) {}
366 
367   std::shared_ptr<ParallelUploadStateImpl> state_;
368   ObjectWriteStream ostream_;
369   std::string file_name_;
370   std::uintmax_t offset_in_file_;
371   std::uintmax_t left_to_upload_;
372   std::size_t upload_buffer_size_;
373   std::string resumable_session_id_;
374 };
375 
376 /**
377  * The state controlling uploading a GCS object via multiple parallel streams.
378  *
379  * To use this class obtain the state via `PrepareParallelUpload` and then write
380  * the data to the streams associated with each shard. Once writing is done,
381  * close or destroy the streams.
382  *
383  * When all the streams are `Close`d or destroyed, this class will join the
384  * them (via `ComposeMany`) into the destination object and set the value in
385  * `future`s returned by `WaitForCompletion`.
386  *
387  * Parallel upload will create temporary files. Upon completion of the whole
388  * operation, this class will attempt to remove them in its destructor, but if
389  * they fail, they fail silently. In order to proactively cleanup these files,
390  * one can call `EagerCleanup()`.
391  */
392 class NonResumableParallelUploadState {
393  public:
394   template <typename... Options>
395   static StatusOr<NonResumableParallelUploadState> Create(
396       Client client, std::string const& bucket_name,
397       std::string const& object_name, std::size_t num_shards,
398       std::string const& prefix, std::tuple<Options...> options);
399 
400   /**
401    * Asynchronously wait for completion of the whole upload operation.
402    *
403    * @return the returned future will have a value set to the destination object
404    *     metadata when all the streams are `Close`d or destroyed.
405    */
406   future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
407     return impl_->WaitForCompletion();
408   }
409 
410   /**
411    * Cleanup all the temporary files
412    *
413    * The destruction of this object will perform cleanup of all the temporary
414    * files used in the process of the parallel upload. If the cleanup fails, it
415    * will fail silently not to crash the program.
416    *
417    * If you want to control the status of the cleanup, use this member function
418    * to do it eagerly, before destruction.
419    *
420    * @return the status of the cleanup.
421    */
422   Status EagerCleanup() { return impl_->EagerCleanup(); }
423 
424   /**
425    * The streams to write to.
426    *
427    * When the streams are `Close`d, they will be concatenated into the
428    * destination object in the same order as they appeared in this vector upon
429    * this object's creation.
430    *
431    * It is safe to destroy or `std::move()` these streams.
432    */
433   std::vector<ObjectWriteStream>& shards() { return shards_; }
434 
435   /**
436    * Fail the whole operation.
437    *
438    * If called before all streams are closed or destroyed, calling this
439    * operation will prevent composing the streams into the final destination
440    * object and return a failure via `WaitForCompletion()`.
441    *
442    * @param status the status to fail the operation with.
443    */
444   void Fail(Status status) { return impl_->Fail(std::move(status)); }
445 
446  private:
447   NonResumableParallelUploadState(
448       std::shared_ptr<ParallelUploadStateImpl> state,
449       std::vector<ObjectWriteStream> shards)
450       : impl_(std::move(state)), shards_(std::move(shards)) {}
451 
452   std::shared_ptr<ParallelUploadStateImpl> impl_;
453   std::vector<ObjectWriteStream> shards_;
454 
455   friend class NonResumableParallelObjectWriteStreambuf;
456   friend struct CreateParallelUploadShards;
457 };
458 
459 /**
460  * The state controlling uploading a GCS object via multiple parallel streams,
461  * allowing for resuming.
462  *
463  * To use this class obtain the state via `PrepareParallelUpload` (with
464  * `UseResumableUploadSession` option) and then write the data to the streams
465  * associated with each shard. Once writing is done, close or destroy the
466  * streams.
467  *
468  * When all the streams are `Close`d or destroyed, this class will join the
469  * them (via `ComposeMany`) into the destination object and set the value in
470  * `future`s returned by `WaitForCompletion`.
471  *
472  * Parallel upload will create temporary files. Upon successful completion of
473  * the whole operation, this class will attempt to remove them in its
474  * destructor, but if they fail, they fail silently. In order to proactively
475  * cleanup these files, one can call `EagerCleanup()`.
476  *
477  * In oder to resume an interrupted upload, provide `UseResumableUploadSession`
478  * to `PrepareParallelUpload` with value set to what `resumable_session_id()`
479  * returns.
480  */
481 class ResumableParallelUploadState {
482  public:
483   static std::string session_id_prefix() { return "ParUpl:"; }
484 
485   template <typename... Options>
486   static StatusOr<ResumableParallelUploadState> CreateNew(
487       Client client, std::string const& bucket_name,
488       std::string const& object_name, std::size_t num_shards,
489       std::string const& prefix, std::string const& extra_state,
490       std::tuple<Options...> const& options);
491 
492   template <typename... Options>
493   static StatusOr<ResumableParallelUploadState> Resume(
494       Client client, std::string const& bucket_name,
495       std::string const& object_name, std::size_t num_shards,
496       std::string const& prefix, std::string const& resumable_session_id,
497       std::tuple<Options...> options);
498 
499   /**
500    * Retrieve the resumable session id.
501    *
502    * This value, if passed via `UseResumableUploadSession` option indicates that
503    * an upload should be a continuation of the one which this object represents.
504    */
505   std::string resumable_session_id() { return resumable_session_id_; }
506 
507   /**
508    * Asynchronously wait for completion of the whole upload operation.
509    *
510    * @return the returned future will have a value set to the destination object
511    *     metadata when all the streams are `Close`d or destroyed.
512    */
513   future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
514     return impl_->WaitForCompletion();
515   }
516 
517   /**
518    * Cleanup all the temporary files
519    *
520    * The destruction of this object will perform cleanup of all the temporary
521    * files used in the process of the parallel upload. If the cleanup fails, it
522    * will fail silently not to crash the program.
523    *
524    * If you want to control the status of the cleanup, use this member function
525    * to do it eagerly, before destruction.
526    *
527    * @return the status of the cleanup.
528    */
529   Status EagerCleanup() { return impl_->EagerCleanup(); }
530 
531   /**
532    * The streams to write to.
533    *
534    * When the streams are `Close`d, they will be concatenated into the
535    * destination object in the same order as they appeared in this vector upon
536    * this object's creation.
537    *
538    * It is safe to destroy or `std::move()` these streams.
539    */
540   std::vector<ObjectWriteStream>& shards() { return shards_; }
541 
542   /**
543    * Fail the whole operation.
544    *
545    * If called before all streams are closed or destroyed, calling this
546    * operation will prevent composing the streams into the final destination
547    * object and return a failure via `WaitForCompletion()`.
548    *
549    * @param status the status to fail the operation with.
550    */
551   void Fail(Status status) { return impl_->Fail(std::move(status)); }
552 
553  private:
554   template <typename... Options>
555   static std::shared_ptr<ScopedDeleter> CreateDeleter(
556       Client client, std::string const& bucket_name,
557       std::tuple<Options...> const& options);
558 
559   template <typename... Options>
560   static Composer CreateComposer(Client client, std::string const& bucket_name,
561                                  std::string const& object_name,
562                                  std::int64_t expected_generation,
563                                  std::string const& prefix,
564                                  std::tuple<Options...> const& options);
565 
566   ResumableParallelUploadState(std::string resumable_session_id,
567                                std::shared_ptr<ParallelUploadStateImpl> state,
568                                std::vector<ObjectWriteStream> shards)
569       : resumable_session_id_(std::move(resumable_session_id)),
570         impl_(std::move(state)),
571         shards_(std::move(shards)) {}
572 
573   std::string resumable_session_id_;
574   std::shared_ptr<ParallelUploadStateImpl> impl_;
575   std::vector<ObjectWriteStream> shards_;
576 
577   friend class ResumableParallelObjectWriteStreambuf;
578   friend struct CreateParallelUploadShards;
579 };
580 
581 /**
582  * Prepare a parallel upload state.
583  *
584  * The returned `NonResumableParallelUploadState` will contain streams to which
585  * data can be uploaded in parallel.
586  *
587  * @param client the client on which to perform the operation.
588  * @param bucket_name the name of the bucket that will contain the object.
589  * @param object_name the uploaded object name.
590  * @param num_shards how many streams to upload the object through.
591  * @param prefix the prefix with which temporary objects will be created.
592  * @param options a list of optional query parameters and/or request headers.
593  *     Valid types for this operation include `DestinationPredefinedAcl`,
594  *     `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
595  *     `KmsKeyName`, `QuotaUser`, `UserIp`, `UserProject`, `WithObjectMetadata`.
596  *
597  * @return the state of the parallel upload
598  *
599  * @par Idempotency
600  * This operation is not idempotent. While each request performed by this
601  * function is retried based on the client policies, the operation itself stops
602  * on the first request that fails.
603  */
604 template <typename... Options,
605           typename std::enable_if<
606               NotAmong<typename std::decay<Options>::type...>::template TPred<
607                   UseResumableUploadSession>::value,
608               int>::type EnableIfNotResumable = 0>
609 StatusOr<NonResumableParallelUploadState> PrepareParallelUpload(
610     Client client, std::string const& bucket_name,
611     std::string const& object_name, std::size_t num_shards,
612     std::string const& prefix, Options&&... options) {
613   return NonResumableParallelUploadState::Create(
614       std::move(client), bucket_name, object_name, num_shards, prefix,
615       StaticTupleFilter<NotAmong<ParallelUploadExtraPersistentState>::TPred>(
616           std::forward_as_tuple(std::forward<Options>(options)...)));
617 }
618 
619 template <typename... Options,
620           typename std::enable_if<
621               Among<typename std::decay<Options>::type...>::template TPred<
622                   UseResumableUploadSession>::value,
623               int>::type EnableIfResumable = 0>
624 StatusOr<ResumableParallelUploadState> PrepareParallelUpload(
625     Client client, std::string const& bucket_name,
626     std::string const& object_name, std::size_t num_shards,
627     std::string const& prefix, Options&&... options) {
628   auto resumable_args =
629       StaticTupleFilter<Among<UseResumableUploadSession>::TPred>(
630           std::tie(options...));
631   static_assert(std::tuple_size<decltype(resumable_args)>::value == 1,
632                 "The should be exacly one UseResumableUploadSession argument");
633   std::string resumable_session_id = std::get<0>(resumable_args).value();
634   auto extra_state_arg =
635       ExtractFirstOccurenceOfType<ParallelUploadExtraPersistentState>(
636           std::tie(options...));
637 
638   auto forwarded_args =
639       StaticTupleFilter<NotAmong<UseResumableUploadSession,
640                                  ParallelUploadExtraPersistentState>::TPred>(
641           std::forward_as_tuple(std::forward<Options>(options)...));
642 
643   if (resumable_session_id.empty()) {
644     return ResumableParallelUploadState::CreateNew(
645         std::move(client), bucket_name, object_name, num_shards, prefix,
646         extra_state_arg ? std::move(extra_state_arg).value().payload()
647                         : std::string(),
648         std::move(forwarded_args));
649   }
650   return ResumableParallelUploadState::Resume(
651       std::move(client), bucket_name, object_name, num_shards, prefix,
652       resumable_session_id, std::move(forwarded_args));
653 }
654 
655 template <typename... Options>
656 StatusOr<NonResumableParallelUploadState>
657 NonResumableParallelUploadState::Create(Client client,
658                                         std::string const& bucket_name,
659                                         std::string const& object_name,
660                                         std::size_t num_shards,
661                                         std::string const& prefix,
662                                         std::tuple<Options...> options) {
663   using internal::StaticTupleFilter;
664   auto delete_options =
665       StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
666   auto deleter = std::make_shared<ScopedDeleter>(
667       [client, bucket_name, delete_options](std::string const& object_name,
668                                             std::int64_t generation) mutable {
669         return google::cloud::internal::apply(
670             DeleteApplyHelper{client, std::move(bucket_name), object_name},
671             std::tuple_cat(std::make_tuple(IfGenerationMatch(generation)),
672                            std::move(delete_options)));
673       });
674 
675   auto compose_options = StaticTupleFilter<
676       Among<DestinationPredefinedAcl, EncryptionKey, IfGenerationMatch,
677             IfMetagenerationMatch, KmsKeyName, QuotaUser, UserIp, UserProject,
678             WithObjectMetadata>::TPred>(options);
679   auto composer = [client, bucket_name, object_name, compose_options, prefix](
680                       std::vector<ComposeSourceObject> const& sources) mutable {
681     return google::cloud::internal::apply(
682         ComposeManyApplyHelper{client, std::move(bucket_name),
683                                std::move(sources), prefix + ".compose_many",
684                                std::move(object_name)},
685         std::move(compose_options));
686   };
687 
688   auto lock = internal::LockPrefix(client, bucket_name, prefix, options);
689   if (!lock) {
690     return Status(
691         lock.status().code(),
692         "Failed to lock prefix for ParallelUpload: " + lock.status().message());
693   }
694   deleter->Add(*lock);
695 
696   auto internal_state = std::make_shared<ParallelUploadStateImpl>(
697       true, object_name, 0, std::move(deleter), std::move(composer));
698   std::vector<ObjectWriteStream> streams;
699 
700   auto upload_options = StaticTupleFilter<
701       Among<ContentEncoding, ContentType, DisableCrc32cChecksum, DisableMD5Hash,
702             EncryptionKey, KmsKeyName, PredefinedAcl, UserProject,
703             WithObjectMetadata>::TPred>(std::move(options));
704   auto& raw_client = *client.raw_client_;
705   for (std::size_t i = 0; i < num_shards; ++i) {
706     ResumableUploadRequest request(
707         bucket_name, prefix + ".upload_shard_" + std::to_string(i));
708     google::cloud::internal::apply(SetOptionsApplyHelper(request),
709                                    upload_options);
710     auto stream = internal_state->CreateStream(raw_client, request);
711     if (!stream) {
712       return stream.status();
713     }
714     streams.emplace_back(*std::move(stream));
715   }
716   return NonResumableParallelUploadState(std::move(internal_state),
717                                          std::move(streams));
718 }
719 
720 template <typename... Options>
721 std::shared_ptr<ScopedDeleter> ResumableParallelUploadState::CreateDeleter(
722     Client client,  // NOLINT(performance-unnecessary-value-param)
723     std::string const& bucket_name, std::tuple<Options...> const& options) {
724   using internal::StaticTupleFilter;
725   auto delete_options =
726       StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
727   return std::make_shared<ScopedDeleter>(
728       [client, bucket_name, delete_options](std::string const& object_name,
729                                             std::int64_t generation) mutable {
730         return google::cloud::internal::apply(
731             DeleteApplyHelper{client, std::move(bucket_name), object_name},
732             std::tuple_cat(std::make_tuple(IfGenerationMatch(generation)),
733                            std::move(delete_options)));
734       });
735 }
736 
737 template <typename... Options>
738 Composer ResumableParallelUploadState::CreateComposer(
739     Client client,  // NOLINT(performance-unnecessary-value-param)
740     std::string const& bucket_name, std::string const& object_name,
741     std::int64_t expected_generation, std::string const& prefix,
742     std::tuple<Options...> const& options) {
743   auto compose_options = std::tuple_cat(
744       StaticTupleFilter<
745           Among<DestinationPredefinedAcl, EncryptionKey, KmsKeyName, QuotaUser,
746                 UserIp, UserProject, WithObjectMetadata>::TPred>(options),
747       std::make_tuple(IfGenerationMatch(expected_generation)));
748   auto get_metadata_options = StaticTupleFilter<
749       Among<DestinationPredefinedAcl, EncryptionKey, KmsKeyName, QuotaUser,
750             UserIp, UserProject, WithObjectMetadata>::TPred>(options);
751   auto composer =
752       [client, bucket_name, object_name, compose_options, get_metadata_options,
753        prefix](std::vector<ComposeSourceObject> const& sources) mutable
754       -> StatusOr<ObjectMetadata> {
755     auto res = google::cloud::internal::apply(
756         ComposeManyApplyHelper{client, bucket_name, std::move(sources),
757                                prefix + ".compose_many", object_name},
758         std::move(compose_options));
759     if (res) {
760       return res;
761     }
762     if (res.status().code() != StatusCode::kFailedPrecondition) {
763       return res.status();
764     }
765     // This means that the object already exists and it is not the object, which
766     // existed upon start of parallel upload. For simplicity, we assume that
767     // it's a result of a previously interrupted ComposeMany invocation.
768     return google::cloud::internal::apply(
769         GetObjectMetadataApplyHelper{client, std::move(bucket_name),
770                                      std::move(object_name)},
771         std::move(get_metadata_options));
772   };
773   return Composer(std::move(composer));
774 }
775 
776 template <typename... Options>
777 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::CreateNew(
778     Client client, std::string const& bucket_name,
779     std::string const& object_name, std::size_t num_shards,
780     std::string const& prefix, std::string const& extra_state,
781     std::tuple<Options...> const& options) {
782   using internal::StaticTupleFilter;
783 
784   auto get_object_meta_options = StaticTupleFilter<
785       Among<IfGenerationMatch, IfGenerationNotMatch, IfMetagenerationMatch,
786             IfMetagenerationNotMatch, UserProject>::TPred>(options);
787   auto object_meta = google::cloud::internal::apply(
788       GetObjectMetadataApplyHelper{client, bucket_name, object_name},
789       std::move(get_object_meta_options));
790   if (!object_meta && object_meta.status().code() != StatusCode::kNotFound) {
791     return object_meta.status();
792   }
793   std::int64_t expected_generation =
794       object_meta ? object_meta->generation() : 0;
795 
796   auto deleter = CreateDeleter(client, bucket_name, options);
797   auto composer = CreateComposer(client, bucket_name, object_name,
798                                  expected_generation, prefix, options);
799   auto internal_state = std::make_shared<ParallelUploadStateImpl>(
800       false, object_name, expected_generation, deleter, std::move(composer));
801   internal_state->set_custom_data(std::move(extra_state));
802 
803   std::vector<ObjectWriteStream> streams;
804 
805   auto upload_options = std::tuple_cat(
806       StaticTupleFilter<
807           Among<ContentEncoding, ContentType, DisableCrc32cChecksum,
808                 DisableMD5Hash, EncryptionKey, KmsKeyName, PredefinedAcl,
809                 UserProject, WithObjectMetadata>::TPred>(options),
810       std::make_tuple(UseResumableUploadSession("")));
811   auto& raw_client = *client.raw_client_;
812   for (std::size_t i = 0; i < num_shards; ++i) {
813     ResumableUploadRequest request(
814         bucket_name, prefix + ".upload_shard_" + std::to_string(i));
815     google::cloud::internal::apply(SetOptionsApplyHelper(request),
816                                    upload_options);
817     auto stream = internal_state->CreateStream(raw_client, request);
818     if (!stream) {
819       return stream.status();
820     }
821     streams.emplace_back(*std::move(stream));
822   }
823 
824   auto state_object_name = prefix + ".upload_state";
825   auto insert_options = std::tuple_cat(
826       std::make_tuple(IfGenerationMatch(0)),
827       StaticTupleFilter<
828           Among<PredefinedAcl, EncryptionKey, KmsKeyName, QuotaUser, UserIp,
829                 UserProject, WithObjectMetadata>::TPred>(options));
830   auto state_object = google::cloud::internal::apply(
831       InsertObjectApplyHelper{client, bucket_name, state_object_name,
832                               internal_state->ToPersistentState().ToString()},
833       std::move(insert_options));
834   if (!state_object) {
835     internal_state->Fail(state_object.status());
836     return std::move(state_object).status();
837   }
838   std::string resumable_session_id = session_id_prefix() + state_object_name +
839                                      ":" +
840                                      std::to_string(state_object->generation());
841   internal_state->set_resumable_session_id(resumable_session_id);
842   deleter->Add(std::move(*state_object));
843   return ResumableParallelUploadState(std::move(resumable_session_id),
844                                       std::move(internal_state),
845                                       std::move(streams));
846 }
847 
848 StatusOr<std::pair<std::string, std::int64_t>> ParseResumableSessionId(
849     std::string const& session_id);
850 
851 template <typename... Options>
852 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::Resume(
853     Client client, std::string const& bucket_name,
854     std::string const& object_name, std::size_t num_shards,
855     std::string const& prefix, std::string const& resumable_session_id,
856     std::tuple<Options...> options) {
857   using internal::StaticTupleFilter;
858 
859   auto state_and_gen = ParseResumableSessionId(resumable_session_id);
860   if (!state_and_gen) {
861     return state_and_gen.status();
862   }
863 
864   auto read_options = std::tuple_cat(
865       StaticTupleFilter<Among<DisableCrc32cChecksum, DisableMD5Hash,
866                               EncryptionKey, Generation, UserProject>::TPred>(
867           options),
868       std::make_tuple(IfGenerationMatch(state_and_gen->second)));
869 
870   auto state_stream = google::cloud::internal::apply(
871       ReadObjectApplyHelper{client, bucket_name, state_and_gen->first},
872       std::move(read_options));
873   std::string state_string(std::istreambuf_iterator<char>{state_stream}, {});
874   state_stream.Close();
875 
876   auto persistent_state =
877       ParallelUploadPersistentState::FromString(state_string);
878   if (!persistent_state) {
879     return persistent_state.status();
880   }
881 
882   if (persistent_state->destination_object_name != object_name) {
883     return Status(StatusCode::kInternal,
884                   "Specified resumable session ID is doesn't match the "
885                   "destination object name (" +
886                       object_name + " vs " +
887                       persistent_state->destination_object_name + ")");
888   }
889   if (persistent_state->streams.size() != num_shards && num_shards != 0) {
890     return Status(StatusCode::kInternal,
891                   "Specified resumable session ID is doesn't match the "
892                   "previously specified number of shards (" +
893                       std::to_string(num_shards) + " vs " +
894                       std::to_string(persistent_state->streams.size()) + ")");
895   }
896 
897   auto deleter = CreateDeleter(client, bucket_name, options);
898   deleter->Add(state_and_gen->first, state_and_gen->second);
899   auto composer =
900       CreateComposer(client, bucket_name, object_name,
901                      persistent_state->expected_generation, prefix, options);
902   auto internal_state = std::make_shared<ParallelUploadStateImpl>(
903       false, object_name, persistent_state->expected_generation, deleter,
904       std::move(composer));
905   internal_state->set_custom_data(std::move(persistent_state->custom_data));
906   internal_state->set_resumable_session_id(resumable_session_id);
907   // If a resumed stream is already finalized, callbacks from streams will be
908   // executed immediately. We don't want them to trigger composition before all
909   // of them are created.
910   internal_state->PreventFromFinishing();
911   std::vector<ObjectWriteStream> streams;
912 
913   auto upload_options = StaticTupleFilter<
914       Among<ContentEncoding, ContentType, DisableCrc32cChecksum, DisableMD5Hash,
915             EncryptionKey, KmsKeyName, PredefinedAcl, UserProject,
916             WithObjectMetadata>::TPred>(std::move(options));
917   auto& raw_client = *client.raw_client_;
918   for (auto& stream_desc : persistent_state->streams) {
919     ResumableUploadRequest request(bucket_name,
920                                    std::move(stream_desc.object_name));
921     google::cloud::internal::apply(
922         SetOptionsApplyHelper(request),
923         std::tuple_cat(upload_options,
924                        std::make_tuple(UseResumableUploadSession(
925                            std::move(stream_desc.resumable_session_id)))));
926     auto stream = internal_state->CreateStream(raw_client, request);
927     if (!stream) {
928       internal_state->AllowFinishing();
929       return stream.status();
930     }
931     streams.emplace_back(*std::move(stream));
932   }
933 
934   internal_state->AllowFinishing();
935   return ResumableParallelUploadState(std::move(resumable_session_id),
936                                       std::move(internal_state),
937                                       std::move(streams));
938 }
939 
940 template <typename... Options>
941 std::vector<std::uintmax_t> ComputeParallelFileUploadSplitPoints(
942     std::uintmax_t file_size, std::tuple<Options...> const& options) {
943   auto div_ceil = [](std::uintmax_t dividend, std::uintmax_t divisor) {
944     return (dividend + divisor - 1) / divisor;
945   };
946   // These defaults were obtained by experiments summarized in
947   // https://github.com/googleapis/google-cloud-cpp/issues/2951#issuecomment-566237128
948   MaxStreams const default_max_streams(64);
949   MinStreamSize const default_min_stream_size(32 * 1024 * 1024);
950 
951   auto const min_stream_size =
952       (std::max<std::uintmax_t>)(1, ExtractFirstOccurenceOfType<MinStreamSize>(
953                                         options)
954                                         .value_or(default_min_stream_size)
955                                         .value());
956   auto const max_streams = ExtractFirstOccurenceOfType<MaxStreams>(options)
957                                .value_or(default_max_streams)
958                                .value();
959 
960   auto const wanted_num_streams =
961       (std::max<
962           std::uintmax_t>)(1, (std::min<std::uintmax_t>)(max_streams,
963                                                          div_ceil(
964                                                              file_size,
965                                                              min_stream_size)));
966 
967   auto const stream_size =
968       (std::max<std::uintmax_t>)(1, div_ceil(file_size, wanted_num_streams));
969 
970   std::vector<std::uintmax_t> res;
971   for (auto split = stream_size; split < file_size; split += stream_size) {
972     res.push_back(split);
973   }
974   return res;
975 }
976 
977 std::string ParallelFileUploadSplitPointsToString(
978     std::vector<std::uintmax_t> const& split_points);
979 
980 StatusOr<std::vector<std::uintmax_t>> ParallelFileUploadSplitPointsFromString(
981     std::string const& s);
982 
983 /**
984  * Helper functor to call `PrepareParallelUpload` via `apply`.
985  *
986  * This object holds only references to objects, hence it should not be stored.
987  * Instead, it should be used only as a transient object allowing for calling
988  * `PrepareParallelUpload` via `apply`.
989  */
990 struct PrepareParallelUploadApplyHelper {
991   // Some gcc versions crash on using decltype for return type here.
992   template <typename... Options>
993   StatusOr<typename std::conditional<
994       Among<typename std::decay<Options>::type...>::template TPred<
995           UseResumableUploadSession>::value,
996       ResumableParallelUploadState, NonResumableParallelUploadState>::type>
997   operator()(Options&&... options) {
998     return PrepareParallelUpload(std::move(client), bucket_name, object_name,
999                                  num_shards, prefix,
1000                                  std::forward<Options>(options)...);
1001   }
1002 
1003   Client client;
1004   std::string const& bucket_name;
1005   std::string const& object_name;
1006   std::size_t num_shards;
1007   std::string const& prefix;
1008 };
1009 
1010 struct CreateParallelUploadShards {
1011   /**
1012    * Prepare a parallel upload of a given file.
1013    *
1014    * The returned opaque objects reflect computed shards of the given file. Each
1015    * of them has an `Upload()` member function which will perform the upload of
1016    * that shard. You should parallelize running this function on them according
1017    * to your needs. You can affect how many shards will be created by using the
1018    * `MaxStreams` and `MinStreamSize` options.
1019    *
1020    * Any of the returned objects can be used for obtaining the metadata of the
1021    * resulting object.
1022    *
1023    * @param client the client on which to perform the operation.
1024    * @param file_name the path to the file to be uploaded
1025    * @param bucket_name the name of the bucket that will contain the object.
1026    * @param object_name the uploaded object name.
1027    * @param prefix the prefix with which temporary objects will be created.
1028    * @param options a list of optional query parameters and/or request headers.
1029    *     Valid types for this operation include `DestinationPredefinedAcl`,
1030    *     `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1031    *     `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1032    *     `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1033    *
1034    * @return the shards of the input file to be uploaded in parallel
1035    *
1036    * @par Idempotency
1037    * This operation is not idempotent. While each request performed by this
1038    * function is retried based on the client policies, the operation itself
1039    * stops on the first request that fails.
1040    *
1041    * @par Example
1042    * @snippet storage_object_file_transfer_samples.cc parallel upload file
1043    */
1044   template <typename... Options>
1045   static StatusOr<std::vector<ParallelUploadFileShard>> Create(
1046       Client client,  // NOLINT(performance-unnecessary-value-param)
1047       std::string file_name, std::string const& bucket_name,
1048       std::string const& object_name, std::string const& prefix,
1049       Options&&... options) {
1050     std::error_code size_err;
1051     auto file_size = google::cloud::internal::file_size(file_name, size_err);
1052     if (size_err) {
1053       return Status(StatusCode::kNotFound, size_err.message());
1054     }
1055 
1056     auto const resumable_session_id_arg =
1057         ExtractFirstOccurenceOfType<UseResumableUploadSession>(
1058             std::tie(options...));
1059     bool const new_session = !resumable_session_id_arg ||
1060                              resumable_session_id_arg.value().value().empty();
1061     auto upload_options =
1062         StaticTupleFilter<NotAmong<MaxStreams, MinStreamSize>::TPred>(
1063             std::tie(options...));
1064 
1065     std::vector<uintmax_t> file_split_points;
1066     std::size_t num_shards = 0;
1067     if (new_session) {
1068       file_split_points =
1069           ComputeParallelFileUploadSplitPoints(file_size, std::tie(options...));
1070       num_shards = file_split_points.size() + 1;
1071     }
1072 
1073     // Create the upload state.
1074     auto state = google::cloud::internal::apply(
1075         PrepareParallelUploadApplyHelper{client, bucket_name, object_name,
1076                                          num_shards, prefix},
1077         std::tuple_cat(
1078             std::move(upload_options),
1079             std::make_tuple(ParallelUploadExtraPersistentState(
1080                 ParallelFileUploadSplitPointsToString(file_split_points)))));
1081     if (!state) {
1082       return state.status();
1083     }
1084 
1085     if (!new_session) {
1086       // We need to recreate the split points of the file.
1087       auto maybe_split_points =
1088           ParallelFileUploadSplitPointsFromString(state->impl_->custom_data());
1089       if (!maybe_split_points) {
1090         state->Fail(maybe_split_points.status());
1091         return std::move(maybe_split_points).status();
1092       }
1093       file_split_points = *std::move(maybe_split_points);
1094     }
1095 
1096     // Everything ready - we've got the shared state and the files open, let's
1097     // prepare the returned objects.
1098     auto upload_buffer_size =
1099         client.raw_client()->client_options().upload_buffer_size();
1100 
1101     file_split_points.emplace_back(file_size);
1102     assert(file_split_points.size() == state->shards().size());
1103     std::vector<ParallelUploadFileShard> res;
1104     std::uintmax_t offset = 0;
1105     std::size_t shard_idx = 0;
1106     for (auto shard_end : file_split_points) {
1107       res.emplace_back(ParallelUploadFileShard(
1108           state->impl_, std::move(state->shards()[shard_idx++]), file_name,
1109           offset, shard_end - offset, upload_buffer_size));
1110       offset = shard_end;
1111     }
1112 #if defined(__clang__) && \
1113     (__clang_major__ < 4 || (__clang_major__ == 3 && __clang_minor__ <= 8))
1114     // The extra std::move() is required to workaround a Clang <= 3.8 bug, which
1115     // tries to copy the result otherwise.
1116     return std::move(res);
1117 #else
1118     return res;
1119 #endif
1120   }
1121 };
1122 
1123 /// @copydoc CreateParallelUploadShards::Create()
1124 template <typename... Options>
1125 StatusOr<std::vector<ParallelUploadFileShard>> CreateUploadShards(
1126     Client client,  // NOLxxxxxxINT(performance-unnecessary-value-param)
1127     std::string file_name, std::string const& bucket_name,
1128     std::string const& object_name, std::string const& prefix,
1129     Options&&... options) {
1130   return CreateParallelUploadShards::Create(
1131       std::move(client), std::move(file_name), bucket_name, object_name, prefix,
1132       std::forward<Options>(options)...);
1133 }
1134 
1135 }  // namespace internal
1136 
1137 /**
1138  * Perform a parallel upload of a given file.
1139  *
1140  * You can affect how many shards will be created by using the `MaxStreams` and
1141  * `MinStreamSize` options.
1142  *
1143  * @param client the client on which to perform the operation.
1144  * @param file_name the path to the file to be uploaded
1145  * @param bucket_name the name of the bucket that will contain the object.
1146  * @param object_name the uploaded object name.
1147  * @param prefix the prefix with which temporary objects will be created.
1148  * @param ignore_cleanup_failures treat failures to cleanup the temporary
1149  *     objects as not fatal.
1150  * @param options a list of optional query parameters and/or request headers.
1151  *     Valid types for this operation include `DestinationPredefinedAcl`,
1152  *     `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1153  *     `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1154  *     `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1155  *
1156  * @return the metadata of the object created by the upload.
1157  *
1158  * @par Idempotency
1159  * This operation is not idempotent. While each request performed by this
1160  * function is retried based on the client policies, the operation itself stops
1161  * on the first request that fails.
1162  *
1163  * @par Example
1164  * @snippet storage_object_file_transfer_samples.cc parallel upload file
1165  */
1166 template <typename... Options>
1167 StatusOr<ObjectMetadata> ParallelUploadFile(
1168     Client client, std::string file_name, std::string bucket_name,
1169     std::string object_name, std::string prefix, bool ignore_cleanup_failures,
1170     Options&&... options) {
1171   auto shards = internal::CreateParallelUploadShards::Create(
1172       std::move(client), std::move(file_name), std::move(bucket_name),
1173       std::move(object_name), std::move(prefix),
1174       std::forward<Options>(options)...);
1175   if (!shards) {
1176     return shards.status();
1177   }
1178 
1179   std::vector<std::thread> threads;
1180   threads.reserve(shards->size());
1181   for (auto& shard : *shards) {
1182     threads.emplace_back([&shard] {
1183       // We can safely ignore the status - if something fails we'll know
1184       // when obtaining final metadata.
1185       shard.Upload();
1186     });
1187   }
1188   for (auto& thread : threads) {
1189     thread.join();
1190   }
1191   auto res = (*shards)[0].WaitForCompletion().get();
1192   auto cleanup_res = (*shards)[0].EagerCleanup();
1193   if (!cleanup_res.ok() && !ignore_cleanup_failures) {
1194     return cleanup_res;
1195   }
1196   return res;
1197 }
1198 
1199 }  // namespace STORAGE_CLIENT_NS
1200 }  // namespace storage
1201 }  // namespace cloud
1202 }  // namespace google
1203 
1204 #endif  // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
1205