1 /**
2  * @file   s3.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file implements the S3 class.
31  */
32 
33 #ifdef HAVE_S3
34 
35 #include <aws/core/utils/logging/AWSLogging.h>
36 #include <aws/core/utils/logging/DefaultLogSystem.h>
37 #include <aws/core/utils/logging/LogLevel.h>
38 #include <aws/core/utils/memory/stl/AWSString.h>
39 #include <aws/s3/model/AbortMultipartUploadRequest.h>
40 #include <aws/s3/model/CreateMultipartUploadRequest.h>
41 #include <boost/interprocess/streams/bufferstream.hpp>
42 #include <fstream>
43 #include <iostream>
44 
45 #include "tiledb/common/logger.h"
46 #include "tiledb/common/unique_rwlock.h"
47 #include "tiledb/sm/global_state/global_state.h"
48 #include "tiledb/sm/global_state/unit_test_config.h"
49 #include "tiledb/sm/misc/utils.h"
50 
51 #ifdef _WIN32
52 #if !defined(NOMINMAX)
53 #define NOMINMAX
54 #endif
55 #include <Windows.h>
56 #undef GetMessage  // workaround for
57                    // https://github.com/aws/aws-sdk-cpp/issues/402
58 #endif
59 
60 #include "tiledb/sm/filesystem/s3.h"
61 #include "tiledb/sm/misc/parallel_functions.h"
62 
63 namespace {
64 
aws_log_name_to_level(std::string loglevel)65 Aws::Utils::Logging::LogLevel aws_log_name_to_level(std::string loglevel) {
66   std::transform(loglevel.begin(), loglevel.end(), loglevel.begin(), ::tolower);
67   if (loglevel == "fatal")
68     return Aws::Utils::Logging::LogLevel::Fatal;
69   else if (loglevel == "error")
70     return Aws::Utils::Logging::LogLevel::Error;
71   else if (loglevel == "warn")
72     return Aws::Utils::Logging::LogLevel::Warn;
73   else if (loglevel == "info")
74     return Aws::Utils::Logging::LogLevel::Info;
75   else if (loglevel == "debug")
76     return Aws::Utils::Logging::LogLevel::Debug;
77   else if (loglevel == "trace")
78     return Aws::Utils::Logging::LogLevel::Trace;
79   else
80     return Aws::Utils::Logging::LogLevel::Off;
81 }
82 
83 /**
84  * Return a S3 enum value for any recognized string or NOT_SET if
85  * B) the string is not recognized to match any of the enum values
86  *
87  * @param canned_acl_str A textual string naming one of the
88  *        Aws::S3::Model::ObjectCannedACL enum members.
89  */
S3_ObjectCannedACL_from_str(const std::string & canned_acl_str)90 Aws::S3::Model::ObjectCannedACL S3_ObjectCannedACL_from_str(
91     const std::string& canned_acl_str) {
92   if (canned_acl_str.empty())
93     return Aws::S3::Model::ObjectCannedACL::NOT_SET;
94 
95   if (canned_acl_str == "NOT_SET")
96     return Aws::S3::Model::ObjectCannedACL::NOT_SET;
97   else if (canned_acl_str == "private_")
98     return Aws::S3::Model::ObjectCannedACL::private_;
99   else if (canned_acl_str == "public_read")
100     return Aws::S3::Model::ObjectCannedACL::public_read;
101   else if (canned_acl_str == "public_read_write")
102     return Aws::S3::Model::ObjectCannedACL::public_read_write;
103   else if (canned_acl_str == "authenticated_read")
104     return Aws::S3::Model::ObjectCannedACL::authenticated_read;
105   else if (canned_acl_str == "aws_exec_read")
106     return Aws::S3::Model::ObjectCannedACL::aws_exec_read;
107   else if (canned_acl_str == "bucket_owner_read")
108     return Aws::S3::Model::ObjectCannedACL::bucket_owner_read;
109   else if (canned_acl_str == "bucket_owner_full_control")
110     return Aws::S3::Model::ObjectCannedACL::bucket_owner_full_control;
111   else
112     return Aws::S3::Model::ObjectCannedACL::NOT_SET;
113 }
114 
115 /**
116  * Return a S3 enum value for any recognized string or NOT_SET if
117  * B) the string is not recognized to match any of the enum values
118  *
119  * @param canned_acl_str A textual string naming one of the
120  *        Aws::S3::Model::BucketCannedACL enum members.
121  */
S3_BucketCannedACL_from_str(const std::string & canned_acl_str)122 Aws::S3::Model::BucketCannedACL S3_BucketCannedACL_from_str(
123     const std::string& canned_acl_str) {
124   if (canned_acl_str.empty())
125     return Aws::S3::Model::BucketCannedACL::NOT_SET;
126 
127   if (canned_acl_str == "NOT_SET")
128     return Aws::S3::Model::BucketCannedACL::NOT_SET;
129   else if (canned_acl_str == "private_")
130     return Aws::S3::Model::BucketCannedACL::private_;
131   else if (canned_acl_str == "public_read")
132     return Aws::S3::Model::BucketCannedACL::public_read;
133   else if (canned_acl_str == "public_read_write")
134     return Aws::S3::Model::BucketCannedACL::public_read_write;
135   else if (canned_acl_str == "authenticated_read")
136     return Aws::S3::Model::BucketCannedACL::authenticated_read;
137   else
138     return Aws::S3::Model::BucketCannedACL::NOT_SET;
139 }
140 
141 }  // namespace
142 
143 using namespace tiledb::common;
144 
145 namespace tiledb {
146 namespace sm {
147 
148 namespace {
149 
150 /**
151  * Return the exception name and error message from the given outcome object.
152  *
153  * @tparam R AWS result type
154  * @tparam E AWS error type
155  * @param outcome Outcome to retrieve error message from
156  * @return Error message string
157  */
158 template <typename R, typename E>
outcome_error_message(const Aws::Utils::Outcome<R,E> & outcome)159 std::string outcome_error_message(const Aws::Utils::Outcome<R, E>& outcome) {
160   return std::string("\nException:  ") +
161          outcome.GetError().GetExceptionName().c_str() +
162          std::string("\nError message:  ") +
163          outcome.GetError().GetMessage().c_str();
164 }
165 
166 }  // namespace
167 
168 /* ********************************* */
169 /*          GLOBAL VARIABLES         */
170 /* ********************************* */
171 
172 /** Ensures that the AWS library is only initialized once per process. */
173 static std::once_flag aws_lib_initialized;
174 
175 /* ********************************* */
176 /*     CONSTRUCTORS & DESTRUCTORS    */
177 /* ********************************* */
178 
S3()179 S3::S3()
180     : stats_(nullptr)
181     , state_(State::UNINITIALIZED)
182     , credentials_provider_(nullptr)
183     , file_buffer_size_(0)
184     , max_parallel_ops_(1)
185     , multipart_part_size_(0)
186     , vfs_thread_pool_(nullptr)
187     , use_virtual_addressing_(false)
188     , use_multipart_upload_(true)
189     , request_payer_(Aws::S3::Model::RequestPayer::NOT_SET)
190     , sse_(Aws::S3::Model::ServerSideEncryption::NOT_SET)
191     , object_canned_acl_(Aws::S3::Model::ObjectCannedACL::NOT_SET)
192     , bucket_canned_acl_(Aws::S3::Model::BucketCannedACL::NOT_SET) {
193 }
194 
~S3()195 S3::~S3() {
196   assert(state_ == State::DISCONNECTED);
197   for (auto& buff : file_buffers_)
198     tdb_delete(buff.second);
199 }
200 
201 /* ********************************* */
202 /*                 API               */
203 /* ********************************* */
204 
init(stats::Stats * const parent_stats,const Config & config,ThreadPool * const thread_pool)205 Status S3::init(
206     stats::Stats* const parent_stats,
207     const Config& config,
208     ThreadPool* const thread_pool) {
209   // already initialized
210   if (state_ == State::DISCONNECTED)
211     return Status::Ok();
212 
213   assert(state_ == State::UNINITIALIZED);
214 
215   stats_ = parent_stats->create_child("S3");
216 
217   if (thread_pool == nullptr) {
218     return LOG_STATUS(
219         Status::S3Error("Can't initialize with null thread pool."));
220   }
221 
222   bool found = false;
223   auto logging_level = config.get("vfs.s3.logging_level", &found);
224   assert(found);
225 
226   options_.loggingOptions.logLevel = aws_log_name_to_level(logging_level);
227 
228   // By default, curl sets the signal handler for SIGPIPE to SIG_IGN while
229   // executing. When curl is done executing, it restores the previous signal
230   // handler. This is not thread safe, so the AWS SDK disables this behavior
231   // in curl using the `CURLOPT_NOSIGNAL` option.
232   // Here, we set the `installSigPipeHandler` AWS SDK option to `true` to allow
233   // the AWS SDK to set its own signal handler to ignore SIGPIPE signals. A
234   // SIGPIPE may be raised from the socket library when the peer disconnects
235   // unexpectedly.
236   options_.httpOptions.installSigPipeHandler = true;
237 
238   bool skip_init;
239   RETURN_NOT_OK(config.get<bool>("vfs.s3.skip_init", &skip_init, &found));
240   assert(found);
241 
242   // Initialize the library once per process.
243   if (!skip_init)
244     std::call_once(aws_lib_initialized, [this]() { Aws::InitAPI(options_); });
245 
246   if (options_.loggingOptions.logLevel != Aws::Utils::Logging::LogLevel::Off) {
247     Aws::Utils::Logging::InitializeAWSLogging(
248         Aws::MakeShared<Aws::Utils::Logging::DefaultLogSystem>(
249             "TileDB", Aws::Utils::Logging::LogLevel::Trace, "tiledb_s3_"));
250   }
251 
252   vfs_thread_pool_ = thread_pool;
253   RETURN_NOT_OK(config.get<uint64_t>(
254       "vfs.s3.max_parallel_ops", &max_parallel_ops_, &found));
255   assert(found);
256   RETURN_NOT_OK(config.get<uint64_t>(
257       "vfs.s3.multipart_part_size", &multipart_part_size_, &found));
258   assert(found);
259   file_buffer_size_ = multipart_part_size_ * max_parallel_ops_;
260   region_ = config.get("vfs.s3.region", &found);
261   assert(found);
262   RETURN_NOT_OK(config.get<bool>(
263       "vfs.s3.use_virtual_addressing", &use_virtual_addressing_, &found));
264   assert(found);
265   RETURN_NOT_OK(config.get<bool>(
266       "vfs.s3.use_multipart_upload", &use_multipart_upload_, &found));
267   assert(found);
268 
269   bool request_payer;
270   RETURN_NOT_OK(
271       config.get<bool>("vfs.s3.requester_pays", &request_payer, &found));
272   assert(found);
273 
274   if (request_payer)
275     request_payer_ = Aws::S3::Model::RequestPayer::requester;
276 
277   auto object_acl_str = config.get("vfs.s3.object_canned_acl", &found);
278   assert(found);
279   if (found) {
280     object_canned_acl_ = S3_ObjectCannedACL_from_str(object_acl_str);
281   }
282 
283   auto bucket_acl_str = config.get("vfs.s3.bucket_canned_acl", &found);
284   assert(found);
285   if (found) {
286     bucket_canned_acl_ = S3_BucketCannedACL_from_str(bucket_acl_str);
287   }
288 
289   auto sse = config.get("vfs.s3.sse", &found);
290   assert(found);
291 
292   auto sse_kms_key_id = config.get("vfs.s3.sse_kms_key_id", &found);
293   assert(found);
294 
295   if (!sse.empty()) {
296     if (sse == "aes256") {
297       sse_ = Aws::S3::Model::ServerSideEncryption::AES256;
298     } else if (sse == "kms") {
299       sse_ = Aws::S3::Model::ServerSideEncryption::aws_kms;
300       sse_kms_key_id_ = sse_kms_key_id;
301       if (sse_kms_key_id_.empty()) {
302         return Status::S3Error(
303             "Config parameter 'vfs.s3.sse_kms_key_id' must be set "
304             "for kms server-side encryption.");
305       }
306     } else {
307       return Status::S3Error(
308           "Unknown 'vfs.s3.sse' config value " + sse +
309           "; supported values are 'aes256' and 'kms'.");
310     }
311   }
312 
313   // Ensure `sse_kms_key_id` was only set for kms encryption.
314   if (!sse_kms_key_id.empty() &&
315       sse_ != Aws::S3::Model::ServerSideEncryption::aws_kms) {
316     return Status::S3Error(
317         "Config parameter 'vfs.s3.sse_kms_key_id' may only be "
318         "set for 'vfs.s3.sse' == 'kms'.");
319   }
320 
321   config_ = config;
322 
323   state_ = State::INITIALIZED;
324   return Status::Ok();
325 }
326 
create_bucket(const URI & bucket) const327 Status S3::create_bucket(const URI& bucket) const {
328   RETURN_NOT_OK(init_client());
329 
330   if (!bucket.is_s3()) {
331     return LOG_STATUS(Status::S3Error(
332         std::string("URI is not an S3 URI: " + bucket.to_string())));
333   }
334 
335   Aws::Http::URI aws_uri = bucket.c_str();
336   Aws::S3::Model::CreateBucketRequest create_bucket_request;
337   create_bucket_request.SetBucket(aws_uri.GetAuthority());
338 
339   // Set the bucket location constraint equal to the S3 region.
340   // Note: empty string and 'us-east-1' are parsing errors in the SDK.
341   if (!region_.empty() && region_ != "us-east-1") {
342     Aws::S3::Model::CreateBucketConfiguration cfg;
343     Aws::String region_str(region_.c_str());
344     auto location_constraint = Aws::S3::Model::BucketLocationConstraintMapper::
345         GetBucketLocationConstraintForName(region_str);
346     cfg.SetLocationConstraint(location_constraint);
347     create_bucket_request.SetCreateBucketConfiguration(cfg);
348   }
349 
350   if (bucket_canned_acl_ != Aws::S3::Model::BucketCannedACL::NOT_SET) {
351     create_bucket_request.SetACL(bucket_canned_acl_);
352   }
353 
354   auto create_bucket_outcome = client_->CreateBucket(create_bucket_request);
355   if (!create_bucket_outcome.IsSuccess()) {
356     return LOG_STATUS(Status::S3Error(
357         std::string("Failed to create S3 bucket ") + bucket.to_string() +
358         outcome_error_message(create_bucket_outcome)));
359   }
360 
361   RETURN_NOT_OK(wait_for_bucket_to_be_created(bucket));
362 
363   return Status::Ok();
364 }
365 
remove_bucket(const URI & bucket) const366 Status S3::remove_bucket(const URI& bucket) const {
367   RETURN_NOT_OK(init_client());
368 
369   // Empty bucket
370   RETURN_NOT_OK(empty_bucket(bucket));
371 
372   // Delete bucket
373   Aws::Http::URI aws_uri = bucket.c_str();
374   Aws::S3::Model::DeleteBucketRequest delete_bucket_request;
375   delete_bucket_request.SetBucket(aws_uri.GetAuthority());
376   auto delete_bucket_outcome = client_->DeleteBucket(delete_bucket_request);
377   if (!delete_bucket_outcome.IsSuccess()) {
378     return LOG_STATUS(Status::S3Error(
379         std::string("Failed to remove S3 bucket ") + bucket.to_string() +
380         outcome_error_message(delete_bucket_outcome)));
381   }
382   return Status::Ok();
383 }
384 
disconnect()385 Status S3::disconnect() {
386   Status ret_st = Status::Ok();
387 
388   if (state_ == State::UNINITIALIZED) {
389     return ret_st;
390   }
391 
392   // Read-lock 'multipart_upload_states_'.
393   UniqueReadLock unique_rl(&multipart_upload_rwlock_);
394 
395   if (multipart_upload_states_.size() > 0) {
396     RETURN_NOT_OK(init_client());
397 
398     std::vector<const MultiPartUploadState*> states;
399     states.reserve(multipart_upload_states_.size());
400     for (auto& kv : multipart_upload_states_)
401       states.emplace_back(&kv.second);
402 
403     auto status =
404         parallel_for(vfs_thread_pool_, 0, states.size(), [&](uint64_t i) {
405           const MultiPartUploadState* state = states[i];
406           // Lock multipart state
407           std::unique_lock<std::mutex> state_lck(state->mtx);
408 
409           if (state->st.ok()) {
410             Aws::S3::Model::CompleteMultipartUploadRequest complete_request =
411                 make_multipart_complete_request(*state);
412             auto outcome = client_->CompleteMultipartUpload(complete_request);
413             if (!outcome.IsSuccess()) {
414               const Status st = LOG_STATUS(Status::S3Error(
415                   std::string("Failed to disconnect and flush S3 objects. ") +
416                   outcome_error_message(outcome)));
417               if (!st.ok()) {
418                 ret_st = st;
419               }
420             }
421           } else {
422             Aws::S3::Model::AbortMultipartUploadRequest abort_request =
423                 make_multipart_abort_request(*state);
424             auto outcome = client_->AbortMultipartUpload(abort_request);
425             if (!outcome.IsSuccess()) {
426               const Status st = LOG_STATUS(Status::S3Error(
427                   std::string("Failed to disconnect and flush S3 objects. ") +
428                   outcome_error_message(outcome)));
429               if (!st.ok()) {
430                 ret_st = st;
431               }
432             }
433           }
434           return Status::Ok();
435         });
436 
437     RETURN_NOT_OK(status);
438   }
439 
440   unique_rl.unlock();
441 
442   if (options_.loggingOptions.logLevel != Aws::Utils::Logging::LogLevel::Off) {
443     Aws::Utils::Logging::ShutdownAWSLogging();
444   }
445 
446   if (s3_tp_executor_) {
447     const Status st = s3_tp_executor_->Stop();
448     if (!st.ok()) {
449       ret_st = st;
450     }
451   }
452 
453   state_ = State::DISCONNECTED;
454   return ret_st;
455 }
456 
empty_bucket(const URI & bucket) const457 Status S3::empty_bucket(const URI& bucket) const {
458   RETURN_NOT_OK(init_client());
459 
460   auto uri_dir = bucket.add_trailing_slash();
461   return remove_dir(uri_dir);
462 }
463 
flush_object(const URI & uri)464 Status S3::flush_object(const URI& uri) {
465   RETURN_NOT_OK(init_client());
466   if (!use_multipart_upload_) {
467     return flush_direct(uri);
468   }
469   if (!uri.is_s3()) {
470     return LOG_STATUS(Status::S3Error(
471         std::string("URI is not an S3 URI: " + uri.to_string())));
472   }
473 
474   // Flush and delete file buffer. For multipart requests, we must
475   // continue even if 'flush_file_buffer' fails. In that scenario,
476   // we will send an abort request.
477   auto buff = (Buffer*)nullptr;
478   RETURN_NOT_OK(get_file_buffer(uri, &buff));
479   const Status flush_st = flush_file_buffer(uri, buff, true);
480 
481   Aws::Http::URI aws_uri = uri.c_str();
482   std::string path_c_str = aws_uri.GetPath().c_str();
483 
484   // Take a lock protecting 'multipart_upload_states_'.
485   UniqueReadLock unique_rl(&multipart_upload_rwlock_);
486 
487   // Do nothing - empty object
488   auto state_iter = multipart_upload_states_.find(path_c_str);
489   if (state_iter == multipart_upload_states_.end()) {
490     RETURN_NOT_OK(flush_st);
491     return Status::Ok();
492   }
493 
494   const MultiPartUploadState* state = &multipart_upload_states_.at(path_c_str);
495   // Lock multipart state
496   std::unique_lock<std::mutex> state_lck(state->mtx);
497   unique_rl.unlock();
498 
499   if (state->st.ok()) {
500     Aws::S3::Model::CompleteMultipartUploadRequest complete_request =
501         make_multipart_complete_request(*state);
502     auto outcome = client_->CompleteMultipartUpload(complete_request);
503     if (!outcome.IsSuccess()) {
504       return LOG_STATUS(Status::S3Error(
505           std::string("Failed to flush S3 object ") + uri.c_str() +
506           outcome_error_message(outcome)));
507     }
508 
509     auto bucket = state->bucket;
510     auto key = state->key;
511     // It is safe to unlock the state here
512     state_lck.unlock();
513 
514     wait_for_object_to_propagate(move(bucket), move(key));
515 
516     return finish_flush_object(std::move(outcome), uri, buff);
517   } else {
518     Aws::S3::Model::AbortMultipartUploadRequest abort_request =
519         make_multipart_abort_request(*state);
520 
521     auto outcome = client_->AbortMultipartUpload(abort_request);
522 
523     state_lck.unlock();
524 
525     return finish_flush_object(std::move(outcome), uri, buff);
526   }
527 }
528 
529 Aws::S3::Model::CompleteMultipartUploadRequest
make_multipart_complete_request(const MultiPartUploadState & state)530 S3::make_multipart_complete_request(const MultiPartUploadState& state) {
531   // Add all the completed parts (sorted by part number) to the upload object.
532   Aws::S3::Model::CompletedMultipartUpload completed_upload;
533   for (auto& tup : state.completed_parts) {
534     const Aws::S3::Model::CompletedPart& part = std::get<1>(tup);
535     completed_upload.AddParts(part);
536   }
537 
538   Aws::S3::Model::CompleteMultipartUploadRequest complete_request;
539   complete_request.SetBucket(state.bucket);
540   complete_request.SetKey(state.key);
541   complete_request.SetUploadId(state.upload_id);
542   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
543     complete_request.SetRequestPayer(request_payer_);
544   return complete_request.WithMultipartUpload(std::move(completed_upload));
545 }
546 
make_multipart_abort_request(const MultiPartUploadState & state)547 Aws::S3::Model::AbortMultipartUploadRequest S3::make_multipart_abort_request(
548     const MultiPartUploadState& state) {
549   Aws::S3::Model::AbortMultipartUploadRequest abort_request;
550   abort_request.SetBucket(state.bucket);
551   abort_request.SetKey(state.key);
552   abort_request.SetUploadId(state.upload_id);
553   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
554     abort_request.SetRequestPayer(request_payer_);
555   return abort_request;
556 }
557 
558 template <typename R, typename E>
finish_flush_object(const Aws::Utils::Outcome<R,E> & outcome,const URI & uri,Buffer * const buff)559 Status S3::finish_flush_object(
560     const Aws::Utils::Outcome<R, E>& outcome,
561     const URI& uri,
562     Buffer* const buff) {
563   Aws::Http::URI aws_uri = uri.c_str();
564 
565   UniqueWriteLock unique_wl(&multipart_upload_rwlock_);
566   multipart_upload_states_.erase(aws_uri.GetPath().c_str());
567   unique_wl.unlock();
568 
569   std::unique_lock<std::mutex> file_buffers_lck(file_buffers_mtx_);
570   file_buffers_.erase(uri.to_string());
571   file_buffers_lck.unlock();
572   tdb_delete(buff);
573 
574   if (!outcome.IsSuccess()) {
575     return LOG_STATUS(Status::S3Error(
576         std::string("Failed to flush S3 object ") + uri.c_str() +
577         outcome_error_message(outcome)));
578   }
579 
580   return Status::Ok();
581 }
582 
is_empty_bucket(const URI & bucket,bool * is_empty) const583 Status S3::is_empty_bucket(const URI& bucket, bool* is_empty) const {
584   RETURN_NOT_OK(init_client());
585 
586   bool exists;
587   RETURN_NOT_OK(is_bucket(bucket, &exists));
588   if (!exists)
589     return LOG_STATUS(Status::S3Error(
590         "Cannot check if bucket is empty; Bucket does not exist"));
591 
592   Aws::Http::URI aws_uri = bucket.c_str();
593   Aws::S3::Model::ListObjectsRequest list_objects_request;
594   list_objects_request.SetBucket(aws_uri.GetAuthority());
595   list_objects_request.SetPrefix("");
596   list_objects_request.SetDelimiter("/");
597   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
598     list_objects_request.SetRequestPayer(request_payer_);
599   auto list_objects_outcome = client_->ListObjects(list_objects_request);
600 
601   if (!list_objects_outcome.IsSuccess()) {
602     return LOG_STATUS(Status::S3Error(
603         std::string("Failed to list s3 objects in bucket ") + bucket.c_str() +
604         outcome_error_message(list_objects_outcome)));
605   }
606 
607   *is_empty = list_objects_outcome.GetResult().GetContents().empty() &&
608               list_objects_outcome.GetResult().GetCommonPrefixes().empty();
609 
610   return Status::Ok();
611 }
612 
is_bucket(const URI & uri,bool * const exists) const613 Status S3::is_bucket(const URI& uri, bool* const exists) const {
614   init_client();
615 
616   if (!uri.is_s3()) {
617     return LOG_STATUS(Status::S3Error(
618         std::string("URI is not an S3 URI: " + uri.to_string())));
619   }
620 
621   Aws::Http::URI aws_uri = uri.c_str();
622   Aws::S3::Model::HeadBucketRequest head_bucket_request;
623   head_bucket_request.SetBucket(aws_uri.GetAuthority());
624   auto head_bucket_outcome = client_->HeadBucket(head_bucket_request);
625   *exists = head_bucket_outcome.IsSuccess();
626 
627   return Status::Ok();
628 }
629 
is_object(const URI & uri,bool * const exists) const630 Status S3::is_object(const URI& uri, bool* const exists) const {
631   init_client();
632 
633   if (!uri.is_s3()) {
634     return LOG_STATUS(Status::S3Error(
635         std::string("URI is not an S3 URI: " + uri.to_string())));
636   }
637 
638   Aws::Http::URI aws_uri = uri.c_str();
639 
640   return is_object(aws_uri.GetAuthority(), aws_uri.GetPath(), exists);
641 }
642 
is_object(const Aws::String & bucket_name,const Aws::String & object_key,bool * const exists) const643 Status S3::is_object(
644     const Aws::String& bucket_name,
645     const Aws::String& object_key,
646     bool* const exists) const {
647   init_client();
648 
649   Aws::S3::Model::HeadObjectRequest head_object_request;
650   head_object_request.SetBucket(bucket_name);
651   head_object_request.SetKey(object_key);
652   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
653     head_object_request.SetRequestPayer(request_payer_);
654   auto head_object_outcome = client_->HeadObject(head_object_request);
655   *exists = head_object_outcome.IsSuccess();
656 
657   return Status::Ok();
658 }
659 
is_dir(const URI & uri,bool * exists) const660 Status S3::is_dir(const URI& uri, bool* exists) const {
661   RETURN_NOT_OK(init_client());
662 
663   // Potentially add `/` to the end of `uri`
664   auto uri_dir = uri.add_trailing_slash();
665   std::vector<std::string> paths;
666   RETURN_NOT_OK(ls(uri_dir, &paths, "/", 1));
667   *exists = (bool)paths.size();
668   return Status::Ok();
669 }
670 
ls(const URI & prefix,std::vector<std::string> * paths,const std::string & delimiter,int max_paths) const671 Status S3::ls(
672     const URI& prefix,
673     std::vector<std::string>* paths,
674     const std::string& delimiter,
675     int max_paths) const {
676   RETURN_NOT_OK(init_client());
677 
678   const auto prefix_dir = prefix.add_trailing_slash();
679 
680   auto prefix_str = prefix_dir.to_string();
681   if (!prefix_dir.is_s3()) {
682     return LOG_STATUS(
683         Status::S3Error(std::string("URI is not an S3 URI: " + prefix_str)));
684   }
685 
686   Aws::Http::URI aws_uri = prefix_str.c_str();
687   auto aws_prefix = remove_front_slash(aws_uri.GetPath().c_str());
688   std::string aws_auth = aws_uri.GetAuthority().c_str();
689   Aws::S3::Model::ListObjectsRequest list_objects_request;
690   list_objects_request.SetBucket(aws_uri.GetAuthority());
691   list_objects_request.SetPrefix(aws_prefix.c_str());
692   list_objects_request.SetDelimiter(delimiter.c_str());
693   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
694     list_objects_request.SetRequestPayer(request_payer_);
695 
696   bool is_done = false;
697   while (!is_done) {
698     // Not requesting more items than needed
699     if (max_paths != -1)
700       list_objects_request.SetMaxKeys(
701           max_paths - static_cast<int>(paths->size()));
702     auto list_objects_outcome = client_->ListObjects(list_objects_request);
703 
704     if (!list_objects_outcome.IsSuccess())
705       return LOG_STATUS(Status::S3Error(
706           std::string("Error while listing with prefix '") + prefix_str +
707           "' and delimiter '" + delimiter + "'" +
708           outcome_error_message(list_objects_outcome)));
709 
710     for (const auto& object : list_objects_outcome.GetResult().GetContents()) {
711       std::string file(object.GetKey().c_str());
712       paths->push_back("s3://" + aws_auth + add_front_slash(file));
713     }
714 
715     for (const auto& object :
716          list_objects_outcome.GetResult().GetCommonPrefixes()) {
717       std::string file(object.GetPrefix().c_str());
718       paths->push_back(
719           "s3://" + aws_auth + add_front_slash(remove_trailing_slash(file)));
720     }
721 
722     is_done =
723         !list_objects_outcome.GetResult().GetIsTruncated() ||
724         (max_paths != -1 && paths->size() >= static_cast<size_t>(max_paths));
725     if (!is_done) {
726       // The documentation states that "GetNextMarker" will be non-empty only
727       // when the delimiter in the request is non-empty. When the delimiter is
728       // non-empty, we must used the last returned key as the next marker.
729       assert(
730           !delimiter.empty() ||
731           !list_objects_outcome.GetResult().GetContents().empty());
732       Aws::String next_marker =
733           !delimiter.empty() ?
734               list_objects_outcome.GetResult().GetNextMarker() :
735               list_objects_outcome.GetResult().GetContents().back().GetKey();
736       assert(!next_marker.empty());
737 
738       list_objects_request.SetMarker(std::move(next_marker));
739     }
740   }
741 
742   return Status::Ok();
743 }
744 
move_object(const URI & old_uri,const URI & new_uri)745 Status S3::move_object(const URI& old_uri, const URI& new_uri) {
746   RETURN_NOT_OK(init_client());
747 
748   RETURN_NOT_OK(copy_object(old_uri, new_uri));
749   RETURN_NOT_OK(remove_object(old_uri));
750   return Status::Ok();
751 }
752 
move_dir(const URI & old_uri,const URI & new_uri)753 Status S3::move_dir(const URI& old_uri, const URI& new_uri) {
754   RETURN_NOT_OK(init_client());
755 
756   std::vector<std::string> paths;
757   RETURN_NOT_OK(ls(old_uri, &paths, ""));
758   for (const auto& path : paths) {
759     auto suffix = path.substr(old_uri.to_string().size());
760     auto new_path = new_uri.join_path(suffix);
761     RETURN_NOT_OK(move_object(URI(path), URI(new_path)));
762   }
763 
764   return Status::Ok();
765 }
766 
copy_file(const URI & old_uri,const URI & new_uri)767 Status S3::copy_file(const URI& old_uri, const URI& new_uri) {
768   RETURN_NOT_OK(init_client());
769 
770   RETURN_NOT_OK(copy_object(old_uri, new_uri));
771   return Status::Ok();
772 }
773 
copy_dir(const URI & old_uri,const URI & new_uri)774 Status S3::copy_dir(const URI& old_uri, const URI& new_uri) {
775   RETURN_NOT_OK(init_client());
776 
777   std::string old_uri_string = old_uri.to_string();
778   std::vector<std::string> paths;
779   RETURN_NOT_OK(ls(old_uri, &paths));
780   while (!paths.empty()) {
781     std::string file_name_abs = paths.front();
782     URI file_name_uri = URI(file_name_abs);
783     std::string file_name = file_name_abs.substr(old_uri_string.length());
784     paths.erase(paths.begin());
785 
786     bool dir_exists;
787     RETURN_NOT_OK(is_dir(file_name_uri, &dir_exists));
788     if (dir_exists) {
789       std::vector<std::string> child_paths;
790       RETURN_NOT_OK(ls(file_name_uri, &child_paths));
791       paths.insert(paths.end(), child_paths.begin(), child_paths.end());
792     } else {
793       std::string new_path_string = new_uri.to_string() + file_name;
794       URI new_path_uri = URI(new_path_string);
795       RETURN_NOT_OK(copy_object(file_name_uri, new_path_uri));
796     }
797   }
798 
799   return Status::Ok();
800 }
801 
object_size(const URI & uri,uint64_t * nbytes) const802 Status S3::object_size(const URI& uri, uint64_t* nbytes) const {
803   RETURN_NOT_OK(init_client());
804 
805   if (!uri.is_s3()) {
806     return LOG_STATUS(Status::S3Error(
807         std::string("URI is not an S3 URI: " + uri.to_string())));
808   }
809 
810   Aws::Http::URI aws_uri = uri.to_string().c_str();
811   auto aws_path = remove_front_slash(aws_uri.GetPath().c_str());
812 
813   Aws::S3::Model::HeadObjectRequest head_object_request;
814   head_object_request.SetBucket(aws_uri.GetAuthority());
815   head_object_request.SetKey(aws_path.c_str());
816   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
817     head_object_request.SetRequestPayer(request_payer_);
818   auto head_object_outcome = client_->HeadObject(head_object_request);
819 
820   if (!head_object_outcome.IsSuccess())
821     return LOG_STATUS(Status::S3Error(
822         "Cannot retrieve S3 object size; Error while listing file " +
823         uri.to_string() + outcome_error_message(head_object_outcome)));
824   *nbytes =
825       static_cast<uint64_t>(head_object_outcome.GetResult().GetContentLength());
826 
827   return Status::Ok();
828 }
829 
read(const URI & uri,const off_t offset,void * const buffer,const uint64_t length,const uint64_t read_ahead_length,uint64_t * const length_returned) const830 Status S3::read(
831     const URI& uri,
832     const off_t offset,
833     void* const buffer,
834     const uint64_t length,
835     const uint64_t read_ahead_length,
836     uint64_t* const length_returned) const {
837   RETURN_NOT_OK(init_client());
838 
839   if (!uri.is_s3()) {
840     return LOG_STATUS(Status::S3Error(
841         std::string("URI is not an S3 URI: " + uri.to_string())));
842   }
843 
844   Aws::Http::URI aws_uri = uri.c_str();
845   Aws::S3::Model::GetObjectRequest get_object_request;
846   get_object_request.WithBucket(aws_uri.GetAuthority())
847       .WithKey(aws_uri.GetPath());
848   get_object_request.SetRange(
849       ("bytes=" + std::to_string(offset) + "-" +
850        std::to_string(offset + length + read_ahead_length - 1))
851           .c_str());
852   get_object_request.SetResponseStreamFactory(
853       [buffer, length, read_ahead_length]() {
854         return Aws::New<PreallocatedIOStream>(
855             constants::s3_allocation_tag.c_str(),
856             buffer,
857             length + read_ahead_length);
858       });
859 
860   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
861     get_object_request.SetRequestPayer(request_payer_);
862 
863   auto get_object_outcome = client_->GetObject(get_object_request);
864   if (!get_object_outcome.IsSuccess()) {
865     return LOG_STATUS(Status::S3Error(
866         std::string("Failed to read S3 object ") + uri.c_str() +
867         outcome_error_message(get_object_outcome)));
868   }
869 
870   *length_returned =
871       static_cast<uint64_t>(get_object_outcome.GetResult().GetContentLength());
872   if (*length_returned < length) {
873     return LOG_STATUS(Status::S3Error(
874         std::string("Read operation returned different size of bytes ") +
875         std::to_string(*length_returned) + " vs " + std::to_string(length)));
876   }
877 
878   return Status::Ok();
879 }
880 
remove_object(const URI & uri) const881 Status S3::remove_object(const URI& uri) const {
882   RETURN_NOT_OK(init_client());
883 
884   if (!uri.is_s3()) {
885     return LOG_STATUS(Status::S3Error(
886         std::string("URI is not an S3 URI: " + uri.to_string())));
887   }
888 
889   Aws::Http::URI aws_uri = uri.to_string().c_str();
890   Aws::S3::Model::DeleteObjectRequest delete_object_request;
891   delete_object_request.SetBucket(aws_uri.GetAuthority());
892   delete_object_request.SetKey(aws_uri.GetPath());
893   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
894     delete_object_request.SetRequestPayer(request_payer_);
895 
896   auto delete_object_outcome = client_->DeleteObject(delete_object_request);
897   if (!delete_object_outcome.IsSuccess()) {
898     return LOG_STATUS(Status::S3Error(
899         std::string("Failed to delete S3 object '") + uri.c_str() +
900         outcome_error_message(delete_object_outcome)));
901   }
902 
903   wait_for_object_to_be_deleted(
904       delete_object_request.GetBucket(), delete_object_request.GetKey());
905   return Status::Ok();
906 }
907 
remove_dir(const URI & uri) const908 Status S3::remove_dir(const URI& uri) const {
909   RETURN_NOT_OK(init_client());
910 
911   std::vector<std::string> paths;
912   auto uri_dir = uri.add_trailing_slash();
913   RETURN_NOT_OK(ls(uri_dir, &paths, ""));
914   for (const auto& p : paths)
915     RETURN_NOT_OK(remove_object(URI(p)));
916   return Status::Ok();
917 }
918 
touch(const URI & uri) const919 Status S3::touch(const URI& uri) const {
920   RETURN_NOT_OK(init_client());
921 
922   if (!uri.is_s3()) {
923     return LOG_STATUS(Status::S3Error(std::string(
924         "Cannot create file; URI is not an S3 URI: " + uri.to_string())));
925   }
926 
927   if (uri.to_string().back() == '/') {
928     return LOG_STATUS(Status::S3Error(std::string(
929         "Cannot create file; URI is a directory: " + uri.to_string())));
930   }
931 
932   bool exists;
933   RETURN_NOT_OK(is_object(uri, &exists));
934   if (exists) {
935     return Status::Ok();
936   }
937 
938   Aws::Http::URI aws_uri = uri.c_str();
939   Aws::S3::Model::PutObjectRequest put_object_request;
940   put_object_request.WithKey(aws_uri.GetPath())
941       .WithBucket(aws_uri.GetAuthority());
942 
943   auto request_stream =
944       Aws::MakeShared<Aws::StringStream>(constants::s3_allocation_tag.c_str());
945   put_object_request.SetBody(request_stream);
946   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
947     put_object_request.SetRequestPayer(request_payer_);
948   if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
949     put_object_request.SetServerSideEncryption(sse_);
950   if (!sse_kms_key_id_.empty())
951     put_object_request.SetSSEKMSKeyId(Aws::String(sse_kms_key_id_.c_str()));
952   if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
953     put_object_request.SetACL(object_canned_acl_);
954   }
955 
956   auto put_object_outcome = client_->PutObject(put_object_request);
957   if (!put_object_outcome.IsSuccess()) {
958     return LOG_STATUS(Status::S3Error(
959         std::string("Cannot touch object '") + uri.c_str() +
960         outcome_error_message(put_object_outcome)));
961   }
962 
963   wait_for_object_to_propagate(
964       put_object_request.GetBucket(), put_object_request.GetKey());
965 
966   return Status::Ok();
967 }
968 
write(const URI & uri,const void * buffer,uint64_t length)969 Status S3::write(const URI& uri, const void* buffer, uint64_t length) {
970   RETURN_NOT_OK(init_client());
971 
972   if (!uri.is_s3()) {
973     return LOG_STATUS(Status::S3Error(
974         std::string("URI is not an S3 URI: " + uri.to_string())));
975   }
976 
977   // This write is never considered the last part of an object. The last part is
978   // only uploaded with flush_object().
979   const bool is_last_part = false;
980 
981   // Get file buffer
982   auto buff = (Buffer*)nullptr;
983   RETURN_NOT_OK(get_file_buffer(uri, &buff));
984 
985   // Fill file buffer
986   uint64_t nbytes_filled;
987   RETURN_NOT_OK(fill_file_buffer(buff, buffer, length, &nbytes_filled));
988 
989   if ((!use_multipart_upload_) && (nbytes_filled != length)) {
990     std::stringstream errmsg;
991     errmsg << "Direct write failed! " << nbytes_filled
992            << " bytes written to buffer, " << length << " bytes requested.";
993     return LOG_STATUS(Status::S3Error(errmsg.str()));
994   }
995 
996   // Flush file buffer
997   // multipart objects will flush whenever the writes exceed file_buffer_size_
998   // write_direct should just append to buffer and upload later
999   if (use_multipart_upload_) {
1000     if (buff->size() == file_buffer_size_)
1001       RETURN_NOT_OK(flush_file_buffer(uri, buff, is_last_part));
1002 
1003     uint64_t new_length = length - nbytes_filled;
1004     uint64_t offset = nbytes_filled;
1005     // Write chunks
1006     while (new_length > 0) {
1007       if (new_length >= file_buffer_size_) {
1008         RETURN_NOT_OK(write_multipart(
1009             uri, (char*)buffer + offset, file_buffer_size_, is_last_part));
1010         offset += file_buffer_size_;
1011         new_length -= file_buffer_size_;
1012       } else {
1013         RETURN_NOT_OK(fill_file_buffer(
1014             buff, (char*)buffer + offset, new_length, &nbytes_filled));
1015         offset += nbytes_filled;
1016         new_length -= nbytes_filled;
1017       }
1018     }
1019     assert(offset == length);
1020   }
1021 
1022   return Status::Ok();
1023 }
1024 
1025 /* ********************************* */
1026 /*          PRIVATE METHODS          */
1027 /* ********************************* */
1028 
init_client() const1029 Status S3::init_client() const {
1030   assert(state_ == State::INITIALIZED);
1031 
1032   std::lock_guard<std::mutex> lck(client_init_mtx_);
1033 
1034   if (client_ != nullptr) {
1035     // Check credentials. If expired, referesh it
1036     if (credentials_provider_) {
1037       Aws::Auth::AWSCredentials credentials =
1038           credentials_provider_->GetAWSCredentials();
1039       if (credentials.IsExpiredOrEmpty()) {
1040         return LOG_STATUS(
1041             Status::S3Error(std::string("Credentials is expired or empty.")));
1042       }
1043     }
1044     return Status::Ok();
1045   }
1046 
1047   bool found;
1048   auto s3_endpoint_override = config_.get("vfs.s3.endpoint_override", &found);
1049   assert(found);
1050 
1051   // ClientConfiguration should be lazily init'ed here in init_client to avoid
1052   // potential slowdowns for non s3 users as the ClientConfig now attempts to
1053   // check for client configuration on create, which can be slow if aws is not
1054   // configured on a users systems due to ec2 metadata check
1055 
1056   client_config_ = tdb_unique_ptr<Aws::Client::ClientConfiguration>(
1057       tdb_new(Aws::Client::ClientConfiguration));
1058 
1059   s3_tp_executor_ = std::make_shared<S3ThreadPoolExecutor>(vfs_thread_pool_);
1060 
1061   client_config_->executor = s3_tp_executor_;
1062 
1063   auto& client_config = *client_config_.get();
1064 
1065   if (!region_.empty())
1066     client_config.region = region_.c_str();
1067 
1068   if (!s3_endpoint_override.empty())
1069     client_config.endpointOverride = s3_endpoint_override.c_str();
1070 
1071   auto proxy_host = config_.get("vfs.s3.proxy_host", &found);
1072   assert(found);
1073 
1074   uint32_t proxy_port = 0;
1075   RETURN_NOT_OK(
1076       config_.get<uint32_t>("vfs.s3.proxy_port", &proxy_port, &found));
1077   assert(found);
1078 
1079   auto proxy_username = config_.get("vfs.s3.proxy_username", &found);
1080   assert(found);
1081 
1082   auto proxy_password = config_.get("vfs.s3.proxy_password", &found);
1083   assert(found);
1084 
1085   auto proxy_scheme = config_.get("vfs.s3.proxy_scheme", &found);
1086   assert(found);
1087 
1088   if (!proxy_host.empty()) {
1089     client_config.proxyHost = proxy_host.c_str();
1090     client_config.proxyPort = proxy_port;
1091     client_config.proxyScheme = proxy_scheme == "https" ?
1092                                     Aws::Http::Scheme::HTTPS :
1093                                     Aws::Http::Scheme::HTTP;
1094     client_config.proxyUserName = proxy_username.c_str();
1095     client_config.proxyPassword = proxy_password.c_str();
1096   }
1097 
1098   auto s3_scheme = config_.get("vfs.s3.scheme", &found);
1099   assert(found);
1100 
1101   int64_t connect_timeout_ms = 0;
1102   RETURN_NOT_OK(config_.get<int64_t>(
1103       "vfs.s3.connect_timeout_ms", &connect_timeout_ms, &found));
1104   assert(found);
1105 
1106   int64_t request_timeout_ms = 0;
1107   RETURN_NOT_OK(config_.get<int64_t>(
1108       "vfs.s3.request_timeout_ms", &request_timeout_ms, &found));
1109   assert(found);
1110 
1111   auto ca_file = config_.get("vfs.s3.ca_file", &found);
1112   assert(found);
1113 
1114   auto ca_path = config_.get("vfs.s3.ca_path", &found);
1115   assert(found);
1116 
1117   bool verify_ssl = false;
1118   RETURN_NOT_OK(config_.get<bool>("vfs.s3.verify_ssl", &verify_ssl, &found));
1119   assert(found);
1120 
1121   auto aws_access_key_id = config_.get("vfs.s3.aws_access_key_id", &found);
1122   assert(found);
1123 
1124   auto aws_secret_access_key =
1125       config_.get("vfs.s3.aws_secret_access_key", &found);
1126   assert(found);
1127 
1128   auto aws_session_token = config_.get("vfs.s3.aws_session_token", &found);
1129   assert(found);
1130 
1131   auto aws_role_arn = config_.get("vfs.s3.aws_role_arn", &found);
1132   assert(found);
1133 
1134   auto aws_external_id = config_.get("vfs.s3.aws_external_id", &found);
1135   assert(found);
1136 
1137   auto aws_load_frequency = config_.get("vfs.s3.aws_load_frequency", &found);
1138   assert(found);
1139 
1140   auto aws_session_name = config_.get("vfs.s3.aws_session_name", &found);
1141   assert(found);
1142 
1143   int64_t connect_max_tries = 0;
1144   RETURN_NOT_OK(config_.get<int64_t>(
1145       "vfs.s3.connect_max_tries", &connect_max_tries, &found));
1146   assert(found);
1147 
1148   int64_t connect_scale_factor = 0;
1149   RETURN_NOT_OK(config_.get<int64_t>(
1150       "vfs.s3.connect_scale_factor", &connect_scale_factor, &found));
1151   assert(found);
1152 
1153   client_config.scheme = (s3_scheme == "http") ? Aws::Http::Scheme::HTTP :
1154                                                  Aws::Http::Scheme::HTTPS;
1155   client_config.connectTimeoutMs = (long)connect_timeout_ms;
1156   client_config.requestTimeoutMs = (long)request_timeout_ms;
1157   client_config.caFile = ca_file.c_str();
1158   client_config.caPath = ca_path.c_str();
1159   client_config.verifySSL = verify_ssl;
1160 
1161   client_config.retryStrategy = Aws::MakeShared<S3RetryStrategy>(
1162       constants::s3_allocation_tag.c_str(),
1163       stats_,
1164       connect_max_tries,
1165       connect_scale_factor);
1166 
1167 #ifdef __linux__
1168   // If the user has not set a s3 ca file or ca path then let's attempt to set
1169   // the cert file if we've autodetected it
1170   if (ca_file.empty() && ca_path.empty()) {
1171     const std::string cert_file =
1172         global_state::GlobalState::GetGlobalState().cert_file();
1173     if (!cert_file.empty()) {
1174       client_config.caFile = cert_file.c_str();
1175     }
1176   }
1177 #endif
1178 
1179   switch ((!aws_access_key_id.empty() ? 1 : 0) +
1180           (!aws_secret_access_key.empty() ? 2 : 0) +
1181           (!aws_role_arn.empty() ? 4 : 0)) {
1182     case 0:
1183       break;
1184     case 1:
1185     case 2:
1186       return Status::S3Error(
1187           "Insufficient authentication credentials; "
1188           "Both access key id and secret key are needed");
1189     case 3: {
1190       Aws::String access_key_id(aws_access_key_id.c_str());
1191       Aws::String secret_access_key(aws_secret_access_key.c_str());
1192       Aws::String session_token(
1193           !aws_session_token.empty() ? aws_session_token.c_str() : "");
1194       credentials_provider_ = tdb_make_shared(
1195           Aws::Auth::SimpleAWSCredentialsProvider,
1196           access_key_id,
1197           secret_access_key,
1198           session_token);
1199       break;
1200     }
1201     case 4: {
1202       // If AWS Role ARN provided instead of access_key and secret_key,
1203       // temporary credentials will be fetched by assuming this role.
1204       Aws::String role_arn(aws_role_arn.c_str());
1205       Aws::String external_id(
1206           !aws_external_id.empty() ? aws_external_id.c_str() : "");
1207       int load_frequency(
1208           !aws_load_frequency.empty() ?
1209               std::stoi(aws_load_frequency) :
1210               Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS);
1211       Aws::String session_name(
1212           !aws_session_name.empty() ? aws_session_name.c_str() : "");
1213       credentials_provider_ = tdb_make_shared(
1214           Aws::Auth::STSAssumeRoleCredentialsProvider,
1215           role_arn,
1216           session_name,
1217           external_id,
1218           load_frequency,
1219           nullptr);
1220       break;
1221     }
1222     default:
1223       return Status::S3Error(
1224           "Ambiguous authentication credentials; both permanent and temporary "
1225           "authentication credentials are configured");
1226   }
1227 
1228   // The `Aws::S3::S3Client` constructor is not thread-safe. Although we
1229   // currently hold `client_init_mtx_` that protects this routine from threads
1230   // on this instance of `S3`, it is not sufficient protection from threads on
1231   // another instance of `S3`. Use an additional, static mutex for this
1232   // scenario.
1233   static std::mutex static_client_init_mtx;
1234   {
1235     std::lock_guard<std::mutex> static_lck(static_client_init_mtx);
1236 
1237     if (credentials_provider_ == nullptr) {
1238       client_ = tdb_make_shared(
1239           Aws::S3::S3Client,
1240           *client_config_,
1241           Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
1242           use_virtual_addressing_);
1243     } else {
1244       client_ = tdb_make_shared(
1245           Aws::S3::S3Client,
1246           credentials_provider_.inner_sp(),
1247           *client_config_,
1248           Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
1249           use_virtual_addressing_);
1250     }
1251   }
1252 
1253   return Status::Ok();
1254 }
1255 
copy_object(const URI & old_uri,const URI & new_uri)1256 Status S3::copy_object(const URI& old_uri, const URI& new_uri) {
1257   RETURN_NOT_OK(init_client());
1258 
1259   Aws::Http::URI src_uri = old_uri.c_str();
1260   Aws::Http::URI dst_uri = new_uri.c_str();
1261   Aws::S3::Model::CopyObjectRequest copy_object_request;
1262   copy_object_request.SetCopySource(
1263       join_authority_and_path(
1264           src_uri.GetAuthority().c_str(), src_uri.GetPath().c_str())
1265           .c_str());
1266   copy_object_request.SetBucket(dst_uri.GetAuthority());
1267   copy_object_request.SetKey(dst_uri.GetPath());
1268   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1269     copy_object_request.SetRequestPayer(request_payer_);
1270   if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
1271     copy_object_request.SetServerSideEncryption(sse_);
1272   if (!sse_kms_key_id_.empty())
1273     copy_object_request.SetSSEKMSKeyId(Aws::String(sse_kms_key_id_.c_str()));
1274   if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
1275     copy_object_request.SetACL(object_canned_acl_);
1276   }
1277 
1278   auto copy_object_outcome = client_->CopyObject(copy_object_request);
1279   if (!copy_object_outcome.IsSuccess()) {
1280     return LOG_STATUS(Status::S3Error(
1281         std::string("Failed to copy S3 object ") + old_uri.c_str() + " to " +
1282         new_uri.c_str() + outcome_error_message(copy_object_outcome)));
1283   }
1284 
1285   wait_for_object_to_propagate(
1286       copy_object_request.GetBucket(), copy_object_request.GetKey());
1287 
1288   return Status::Ok();
1289 }
1290 
fill_file_buffer(Buffer * buff,const void * buffer,uint64_t length,uint64_t * nbytes_filled)1291 Status S3::fill_file_buffer(
1292     Buffer* buff,
1293     const void* buffer,
1294     uint64_t length,
1295     uint64_t* nbytes_filled) {
1296   *nbytes_filled = std::min(file_buffer_size_ - buff->size(), length);
1297   if (*nbytes_filled > 0)
1298     RETURN_NOT_OK(buff->write(buffer, *nbytes_filled));
1299 
1300   return Status::Ok();
1301 }
1302 
add_front_slash(const std::string & path) const1303 std::string S3::add_front_slash(const std::string& path) const {
1304   return (path.front() != '/') ? (std::string("/") + path) : path;
1305 }
1306 
remove_front_slash(const std::string & path) const1307 std::string S3::remove_front_slash(const std::string& path) const {
1308   if (path.front() == '/')
1309     return path.substr(1, path.length());
1310   return path;
1311 }
1312 
remove_trailing_slash(const std::string & path) const1313 std::string S3::remove_trailing_slash(const std::string& path) const {
1314   if (path.back() == '/') {
1315     return path.substr(0, path.length() - 1);
1316   }
1317 
1318   return path;
1319 }
1320 
flush_file_buffer(const URI & uri,Buffer * buff,bool last_part)1321 Status S3::flush_file_buffer(const URI& uri, Buffer* buff, bool last_part) {
1322   RETURN_NOT_OK(init_client());
1323   if (buff->size() > 0) {
1324     const Status st =
1325         write_multipart(uri, buff->data(), buff->size(), last_part);
1326     buff->reset_size();
1327     RETURN_NOT_OK(st);
1328   }
1329 
1330   return Status::Ok();
1331 }
1332 
get_file_buffer(const URI & uri,Buffer ** buff)1333 Status S3::get_file_buffer(const URI& uri, Buffer** buff) {
1334   // TODO: remove this?
1335   std::unique_lock<std::mutex> lck(file_buffers_mtx_);
1336 
1337   auto uri_str = uri.to_string();
1338   auto it = file_buffers_.find(uri_str);
1339   if (it == file_buffers_.end()) {
1340     auto new_buff = tdb_new(Buffer);
1341     file_buffers_[uri_str] = new_buff;
1342     *buff = new_buff;
1343   } else {
1344     *buff = it->second;
1345   }
1346 
1347   return Status::Ok();
1348 }
1349 
initiate_multipart_request(Aws::Http::URI aws_uri,MultiPartUploadState * state)1350 Status S3::initiate_multipart_request(
1351     Aws::Http::URI aws_uri, MultiPartUploadState* state) {
1352   RETURN_NOT_OK(init_client());
1353 
1354   auto path = aws_uri.GetPath();
1355   std::string path_c_str = path.c_str();
1356   Aws::S3::Model::CreateMultipartUploadRequest multipart_upload_request;
1357   multipart_upload_request.SetBucket(aws_uri.GetAuthority());
1358   multipart_upload_request.SetKey(path);
1359   multipart_upload_request.SetContentType("application/octet-stream");
1360   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1361     multipart_upload_request.SetRequestPayer(request_payer_);
1362   if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
1363     multipart_upload_request.SetServerSideEncryption(sse_);
1364   if (!sse_kms_key_id_.empty())
1365     multipart_upload_request.SetSSEKMSKeyId(
1366         Aws::String(sse_kms_key_id_.c_str()));
1367   if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
1368     multipart_upload_request.SetACL(object_canned_acl_);
1369   }
1370 
1371   auto multipart_upload_outcome =
1372       client_->CreateMultipartUpload(multipart_upload_request);
1373   if (!multipart_upload_outcome.IsSuccess()) {
1374     return LOG_STATUS(Status::S3Error(
1375         std::string("Failed to create multipart request for object '") +
1376         path_c_str + outcome_error_message(multipart_upload_outcome)));
1377   }
1378 
1379   state->part_number = 1;
1380   state->bucket = aws_uri.GetAuthority();
1381   state->key = path;
1382   state->upload_id = multipart_upload_outcome.GetResult().GetUploadId();
1383   state->completed_parts = std::map<int, Aws::S3::Model::CompletedPart>();
1384 
1385   *state = MultiPartUploadState(
1386       1,
1387       Aws::String(aws_uri.GetAuthority()),
1388       Aws::String(path),
1389       Aws::String(multipart_upload_outcome.GetResult().GetUploadId()),
1390       std::map<int, Aws::S3::Model::CompletedPart>());
1391 
1392   return Status::Ok();
1393 }
1394 
join_authority_and_path(const std::string & authority,const std::string & path) const1395 std::string S3::join_authority_and_path(
1396     const std::string& authority, const std::string& path) const {
1397   bool path_has_slash = !path.empty() && path.front() == '/';
1398   bool authority_has_slash = !authority.empty() && authority.back() == '/';
1399   bool need_slash = !(path_has_slash || authority_has_slash);
1400   return authority + (need_slash ? "/" : "") + path;
1401 }
1402 
wait_for_object_to_propagate(const Aws::String & bucket_name,const Aws::String & object_key) const1403 Status S3::wait_for_object_to_propagate(
1404     const Aws::String& bucket_name, const Aws::String& object_key) const {
1405   init_client();
1406 
1407   unsigned attempts_cnt = 0;
1408   while (attempts_cnt++ < constants::s3_max_attempts) {
1409     bool exists;
1410     RETURN_NOT_OK(is_object(bucket_name, object_key, &exists));
1411     if (exists) {
1412       return Status::Ok();
1413     }
1414 
1415     std::this_thread::sleep_for(
1416         std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1417   }
1418 
1419   return LOG_STATUS(Status::S3Error(
1420       "Failed waiting for object " +
1421       std::string(object_key.c_str(), object_key.size()) + " to be created."));
1422 }
1423 
wait_for_object_to_be_deleted(const Aws::String & bucket_name,const Aws::String & object_key) const1424 Status S3::wait_for_object_to_be_deleted(
1425     const Aws::String& bucket_name, const Aws::String& object_key) const {
1426   init_client();
1427 
1428   unsigned attempts_cnt = 0;
1429   while (attempts_cnt++ < constants::s3_max_attempts) {
1430     bool exists;
1431     RETURN_NOT_OK(is_object(bucket_name, object_key, &exists));
1432     if (!exists) {
1433       return Status::Ok();
1434     }
1435 
1436     std::this_thread::sleep_for(
1437         std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1438   }
1439 
1440   return LOG_STATUS(Status::S3Error(
1441       "Failed waiting for object " +
1442       std::string(object_key.c_str(), object_key.size()) + " to be deleted."));
1443 }
1444 
wait_for_bucket_to_be_created(const URI & bucket_uri) const1445 Status S3::wait_for_bucket_to_be_created(const URI& bucket_uri) const {
1446   init_client();
1447 
1448   unsigned attempts_cnt = 0;
1449   while (attempts_cnt++ < constants::s3_max_attempts) {
1450     bool exists;
1451     RETURN_NOT_OK(is_bucket(bucket_uri, &exists));
1452     if (exists) {
1453       return Status::Ok();
1454     }
1455 
1456     std::this_thread::sleep_for(
1457         std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1458   }
1459 
1460   return LOG_STATUS(Status::S3Error(
1461       "Failed waiting for bucket " + bucket_uri.to_string() +
1462       " to be created."));
1463 }
1464 
flush_direct(const URI & uri)1465 Status S3::flush_direct(const URI& uri) {
1466   RETURN_NOT_OK(init_client());
1467 
1468   // Get file buffer
1469   auto buff = (Buffer*)nullptr;
1470   RETURN_NOT_OK(get_file_buffer(uri, &buff));
1471 
1472   const Aws::Http::URI aws_uri(uri.c_str());
1473   const std::string uri_path(aws_uri.GetPath().c_str());
1474 
1475   Aws::S3::Model::PutObjectRequest put_object_request;
1476 
1477   auto stream = std::shared_ptr<Aws::IOStream>(
1478       new boost::interprocess::bufferstream((char*)buff->data(), buff->size()));
1479 
1480   put_object_request.SetBody(stream);
1481   put_object_request.SetContentLength(buff->size());
1482 
1483   // we only want to hash once, and must do it after setting the body
1484   auto md5_hash =
1485       Aws::Utils::HashingUtils::CalculateMD5(*put_object_request.GetBody());
1486 
1487   put_object_request.SetContentMD5(
1488       Aws::Utils::HashingUtils::Base64Encode(md5_hash));
1489   put_object_request.SetContentType("application/octet-stream");
1490   put_object_request.SetBucket(aws_uri.GetAuthority());
1491   put_object_request.SetKey(aws_uri.GetPath());
1492   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1493     put_object_request.SetRequestPayer(request_payer_);
1494   if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
1495     put_object_request.SetServerSideEncryption(sse_);
1496   if (!sse_kms_key_id_.empty())
1497     put_object_request.SetSSEKMSKeyId(Aws::String(sse_kms_key_id_.c_str()));
1498   if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
1499     put_object_request.SetACL(object_canned_acl_);
1500   }
1501 
1502   auto put_object_outcome = client_->PutObject(put_object_request);
1503   if (!put_object_outcome.IsSuccess()) {
1504     return LOG_STATUS(Status::S3Error(
1505         std::string("Cannot write object '") + uri.c_str() +
1506         outcome_error_message(put_object_outcome)));
1507   }
1508 
1509   // verify the MD5 hash of the result
1510   // note the etag is hex-encoded not base64
1511   Aws::StringStream md5_hex;
1512   md5_hex << "\"" << Aws::Utils::HashingUtils::HexEncode(md5_hash) << "\"";
1513   if (md5_hex.str() != put_object_outcome.GetResult().GetETag()) {
1514     return LOG_STATUS(
1515         Status::S3Error("Object uploaded successfully, but MD5 hash does not "
1516                         "match result from server!' "));
1517   }
1518 
1519   wait_for_object_to_propagate(
1520       put_object_request.GetBucket(), put_object_request.GetKey());
1521 
1522   return Status::Ok();
1523 }
1524 
write_multipart(const URI & uri,const void * buffer,uint64_t length,bool last_part)1525 Status S3::write_multipart(
1526     const URI& uri, const void* buffer, uint64_t length, bool last_part) {
1527   RETURN_NOT_OK(init_client());
1528 
1529   // Ensure that each thread is responsible for exactly multipart_part_size_
1530   // bytes (except if this is the last write_multipart, in which case the final
1531   // thread should write less), and cap the number of parallel operations at the
1532   // configured max number. Length must be evenly divisible by
1533   // multipart_part_size_ unless this is the last part.
1534   uint64_t num_ops = last_part ?
1535                          utils::math::ceil(length, multipart_part_size_) :
1536                          (length / multipart_part_size_);
1537   num_ops = std::min(std::max(num_ops, uint64_t(1)), max_parallel_ops_);
1538 
1539   if (!last_part && length % multipart_part_size_ != 0) {
1540     return LOG_STATUS(
1541         Status::S3Error("Length not evenly divisible by part length"));
1542   }
1543 
1544   const Aws::Http::URI aws_uri(uri.c_str());
1545   const std::string uri_path(aws_uri.GetPath().c_str());
1546 
1547   MultiPartUploadState* state;
1548   std::unique_lock<std::mutex> state_lck;
1549 
1550   // Take a lock protecting the shared multipart data structures
1551   // Read lock to see if it exists
1552   UniqueReadLock unique_rl(&multipart_upload_rwlock_);
1553 
1554   auto state_iter = multipart_upload_states_.find(uri_path);
1555   if (state_iter == multipart_upload_states_.end()) {
1556     // If the state is new, we must grab write lock, so unlock from read and
1557     // grab write
1558     unique_rl.unlock();
1559     UniqueWriteLock unique_wl(&multipart_upload_rwlock_);
1560 
1561     // Since we switched locks we need to once again check to make sure another
1562     // thread didn't create the state
1563     state_iter = multipart_upload_states_.find(uri_path);
1564     if (state_iter == multipart_upload_states_.end()) {
1565       auto path = aws_uri.GetPath();
1566       std::string path_str = path.c_str();
1567       MultiPartUploadState new_state;
1568 
1569       assert(multipart_upload_states_.count(path_str) == 0);
1570       multipart_upload_states_.emplace(
1571           std::move(path_str), std::move(new_state));
1572       state = &multipart_upload_states_.at(uri_path);
1573       state_lck = std::unique_lock<std::mutex>(state->mtx);
1574       // Downgrade to read lock, expected below outside the create
1575       unique_wl.unlock();
1576 
1577       // Delete file if it exists (overwrite) and initiate multipart request
1578       bool exists;
1579       RETURN_NOT_OK(is_object(uri, &exists));
1580       if (exists) {
1581         RETURN_NOT_OK(remove_object(uri));
1582       }
1583 
1584       const Status st = initiate_multipart_request(aws_uri, state);
1585       if (!st.ok()) {
1586         return st;
1587       }
1588     } else {
1589       // If another thread switched state, switch back to a read lock
1590       state = &multipart_upload_states_.at(uri_path);
1591 
1592       // Lock multipart state
1593       state_lck = std::unique_lock<std::mutex>(state->mtx);
1594     }
1595   } else {
1596     state = &multipart_upload_states_.at(uri_path);
1597 
1598     // Lock multipart state
1599     state_lck = std::unique_lock<std::mutex>(state->mtx);
1600 
1601     // Unlock, as make_upload_part_req will reaquire as necessary.
1602     unique_rl.unlock();
1603   }
1604 
1605   // Get the upload ID
1606   const auto upload_id = state->upload_id;
1607 
1608   // Assign the part number(s), and make the write request.
1609   if (num_ops == 1) {
1610     const int part_num = state->part_number++;
1611     state_lck.unlock();
1612 
1613     auto ctx =
1614         make_upload_part_req(aws_uri, buffer, length, upload_id, part_num);
1615     return get_make_upload_part_req(uri, uri_path, ctx);
1616   } else {
1617     std::vector<MakeUploadPartCtx> ctx_vec;
1618     ctx_vec.reserve(num_ops);
1619     const uint64_t bytes_per_op = multipart_part_size_;
1620     const int part_num_base = state->part_number;
1621     for (uint64_t i = 0; i < num_ops; i++) {
1622       uint64_t begin = i * bytes_per_op,
1623                end = std::min((i + 1) * bytes_per_op - 1, length - 1);
1624       uint64_t thread_nbytes = end - begin + 1;
1625       auto thread_buffer = reinterpret_cast<const char*>(buffer) + begin;
1626       int part_num = static_cast<int>(part_num_base + i);
1627       auto ctx = make_upload_part_req(
1628           aws_uri, thread_buffer, thread_nbytes, upload_id, part_num);
1629       ctx_vec.emplace_back(std::move(ctx));
1630     }
1631     state->part_number += num_ops;
1632     state_lck.unlock();
1633 
1634     Status aggregate_st = Status::Ok();
1635     for (auto& ctx : ctx_vec) {
1636       const Status st = get_make_upload_part_req(uri, uri_path, ctx);
1637       if (!st.ok()) {
1638         aggregate_st = st;
1639       }
1640     }
1641 
1642     if (!aggregate_st.ok()) {
1643       std::stringstream errmsg;
1644       errmsg << "S3 parallel write multipart error; " << aggregate_st.message();
1645       LOG_STATUS(Status::S3Error(errmsg.str()));
1646     }
1647     return aggregate_st;
1648   }
1649 }
1650 
make_upload_part_req(const Aws::Http::URI & aws_uri,const void * const buffer,const uint64_t length,const Aws::String & upload_id,const int upload_part_num)1651 S3::MakeUploadPartCtx S3::make_upload_part_req(
1652     const Aws::Http::URI& aws_uri,
1653     const void* const buffer,
1654     const uint64_t length,
1655     const Aws::String& upload_id,
1656     const int upload_part_num) {
1657   auto stream = std::shared_ptr<Aws::IOStream>(
1658       new boost::interprocess::bufferstream((char*)buffer, length));
1659 
1660   Aws::S3::Model::UploadPartRequest upload_part_request;
1661   upload_part_request.SetBucket(aws_uri.GetAuthority());
1662   upload_part_request.SetKey(aws_uri.GetPath());
1663   upload_part_request.SetPartNumber(upload_part_num);
1664   upload_part_request.SetUploadId(upload_id);
1665   upload_part_request.SetBody(stream);
1666   upload_part_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(
1667       Aws::Utils::HashingUtils::CalculateMD5(*stream)));
1668   upload_part_request.SetContentLength(length);
1669   if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1670     upload_part_request.SetRequestPayer(request_payer_);
1671 
1672   auto upload_part_outcome_callable =
1673       client_->UploadPartCallable(upload_part_request);
1674 
1675   MakeUploadPartCtx ctx(
1676       std::move(upload_part_outcome_callable), upload_part_num);
1677   return ctx;
1678 }
1679 
get_make_upload_part_req(const URI & uri,const std::string & uri_path,MakeUploadPartCtx & ctx)1680 Status S3::get_make_upload_part_req(
1681     const URI& uri, const std::string& uri_path, MakeUploadPartCtx& ctx) {
1682   RETURN_NOT_OK(init_client());
1683 
1684   auto upload_part_outcome = ctx.upload_part_outcome_callable.get();
1685   bool success = upload_part_outcome.IsSuccess();
1686 
1687   static const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
1688   if (unit_test_cfg.s3_fail_every_nth_upload_request.is_set() &&
1689       ctx.upload_part_num %
1690               unit_test_cfg.s3_fail_every_nth_upload_request.get() ==
1691           0) {
1692     success = false;
1693   }
1694 
1695   if (!success) {
1696     UniqueReadLock unique_rl(&multipart_upload_rwlock_);
1697     auto state = &multipart_upload_states_.at(uri_path);
1698     Status st = Status::S3Error(
1699         std::string("Failed to upload part of S3 object '") + uri.c_str() +
1700         outcome_error_message(upload_part_outcome));
1701     // Lock multipart state
1702     std::unique_lock<std::mutex> state_lck(state->mtx);
1703     unique_rl.unlock();
1704     state->st = st;
1705     return LOG_STATUS(st);
1706   }
1707 
1708   Aws::S3::Model::CompletedPart completed_part;
1709   completed_part.SetETag(upload_part_outcome.GetResult().GetETag());
1710   completed_part.SetPartNumber(ctx.upload_part_num);
1711 
1712   UniqueReadLock unique_rl(&multipart_upload_rwlock_);
1713   auto state = &multipart_upload_states_.at(uri_path);
1714   // Lock multipart state
1715   std::unique_lock<std::mutex> state_lck(state->mtx);
1716   unique_rl.unlock();
1717   state->completed_parts.emplace(
1718       ctx.upload_part_num, std::move(completed_part));
1719   state_lck.unlock();
1720 
1721   return Status::Ok();
1722 }
1723 
1724 }  // namespace sm
1725 }  // namespace tiledb
1726 
1727 #endif
1728