1 /**
2 * @file s3.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file implements the S3 class.
31 */
32
33 #ifdef HAVE_S3
34
35 #include <aws/core/utils/logging/AWSLogging.h>
36 #include <aws/core/utils/logging/DefaultLogSystem.h>
37 #include <aws/core/utils/logging/LogLevel.h>
38 #include <aws/core/utils/memory/stl/AWSString.h>
39 #include <aws/s3/model/AbortMultipartUploadRequest.h>
40 #include <aws/s3/model/CreateMultipartUploadRequest.h>
41 #include <boost/interprocess/streams/bufferstream.hpp>
42 #include <fstream>
43 #include <iostream>
44
45 #include "tiledb/common/logger.h"
46 #include "tiledb/common/unique_rwlock.h"
47 #include "tiledb/sm/global_state/global_state.h"
48 #include "tiledb/sm/global_state/unit_test_config.h"
49 #include "tiledb/sm/misc/utils.h"
50
51 #ifdef _WIN32
52 #if !defined(NOMINMAX)
53 #define NOMINMAX
54 #endif
55 #include <Windows.h>
56 #undef GetMessage // workaround for
57 // https://github.com/aws/aws-sdk-cpp/issues/402
58 #endif
59
60 #include "tiledb/sm/filesystem/s3.h"
61 #include "tiledb/sm/misc/parallel_functions.h"
62
63 namespace {
64
aws_log_name_to_level(std::string loglevel)65 Aws::Utils::Logging::LogLevel aws_log_name_to_level(std::string loglevel) {
66 std::transform(loglevel.begin(), loglevel.end(), loglevel.begin(), ::tolower);
67 if (loglevel == "fatal")
68 return Aws::Utils::Logging::LogLevel::Fatal;
69 else if (loglevel == "error")
70 return Aws::Utils::Logging::LogLevel::Error;
71 else if (loglevel == "warn")
72 return Aws::Utils::Logging::LogLevel::Warn;
73 else if (loglevel == "info")
74 return Aws::Utils::Logging::LogLevel::Info;
75 else if (loglevel == "debug")
76 return Aws::Utils::Logging::LogLevel::Debug;
77 else if (loglevel == "trace")
78 return Aws::Utils::Logging::LogLevel::Trace;
79 else
80 return Aws::Utils::Logging::LogLevel::Off;
81 }
82
83 /**
84 * Return a S3 enum value for any recognized string or NOT_SET if
85 * B) the string is not recognized to match any of the enum values
86 *
87 * @param canned_acl_str A textual string naming one of the
88 * Aws::S3::Model::ObjectCannedACL enum members.
89 */
S3_ObjectCannedACL_from_str(const std::string & canned_acl_str)90 Aws::S3::Model::ObjectCannedACL S3_ObjectCannedACL_from_str(
91 const std::string& canned_acl_str) {
92 if (canned_acl_str.empty())
93 return Aws::S3::Model::ObjectCannedACL::NOT_SET;
94
95 if (canned_acl_str == "NOT_SET")
96 return Aws::S3::Model::ObjectCannedACL::NOT_SET;
97 else if (canned_acl_str == "private_")
98 return Aws::S3::Model::ObjectCannedACL::private_;
99 else if (canned_acl_str == "public_read")
100 return Aws::S3::Model::ObjectCannedACL::public_read;
101 else if (canned_acl_str == "public_read_write")
102 return Aws::S3::Model::ObjectCannedACL::public_read_write;
103 else if (canned_acl_str == "authenticated_read")
104 return Aws::S3::Model::ObjectCannedACL::authenticated_read;
105 else if (canned_acl_str == "aws_exec_read")
106 return Aws::S3::Model::ObjectCannedACL::aws_exec_read;
107 else if (canned_acl_str == "bucket_owner_read")
108 return Aws::S3::Model::ObjectCannedACL::bucket_owner_read;
109 else if (canned_acl_str == "bucket_owner_full_control")
110 return Aws::S3::Model::ObjectCannedACL::bucket_owner_full_control;
111 else
112 return Aws::S3::Model::ObjectCannedACL::NOT_SET;
113 }
114
115 /**
116 * Return a S3 enum value for any recognized string or NOT_SET if
117 * B) the string is not recognized to match any of the enum values
118 *
119 * @param canned_acl_str A textual string naming one of the
120 * Aws::S3::Model::BucketCannedACL enum members.
121 */
S3_BucketCannedACL_from_str(const std::string & canned_acl_str)122 Aws::S3::Model::BucketCannedACL S3_BucketCannedACL_from_str(
123 const std::string& canned_acl_str) {
124 if (canned_acl_str.empty())
125 return Aws::S3::Model::BucketCannedACL::NOT_SET;
126
127 if (canned_acl_str == "NOT_SET")
128 return Aws::S3::Model::BucketCannedACL::NOT_SET;
129 else if (canned_acl_str == "private_")
130 return Aws::S3::Model::BucketCannedACL::private_;
131 else if (canned_acl_str == "public_read")
132 return Aws::S3::Model::BucketCannedACL::public_read;
133 else if (canned_acl_str == "public_read_write")
134 return Aws::S3::Model::BucketCannedACL::public_read_write;
135 else if (canned_acl_str == "authenticated_read")
136 return Aws::S3::Model::BucketCannedACL::authenticated_read;
137 else
138 return Aws::S3::Model::BucketCannedACL::NOT_SET;
139 }
140
141 } // namespace
142
143 using namespace tiledb::common;
144
145 namespace tiledb {
146 namespace sm {
147
148 namespace {
149
150 /**
151 * Return the exception name and error message from the given outcome object.
152 *
153 * @tparam R AWS result type
154 * @tparam E AWS error type
155 * @param outcome Outcome to retrieve error message from
156 * @return Error message string
157 */
158 template <typename R, typename E>
outcome_error_message(const Aws::Utils::Outcome<R,E> & outcome)159 std::string outcome_error_message(const Aws::Utils::Outcome<R, E>& outcome) {
160 return std::string("\nException: ") +
161 outcome.GetError().GetExceptionName().c_str() +
162 std::string("\nError message: ") +
163 outcome.GetError().GetMessage().c_str();
164 }
165
166 } // namespace
167
168 /* ********************************* */
169 /* GLOBAL VARIABLES */
170 /* ********************************* */
171
172 /** Ensures that the AWS library is only initialized once per process. */
173 static std::once_flag aws_lib_initialized;
174
175 /* ********************************* */
176 /* CONSTRUCTORS & DESTRUCTORS */
177 /* ********************************* */
178
S3()179 S3::S3()
180 : stats_(nullptr)
181 , state_(State::UNINITIALIZED)
182 , credentials_provider_(nullptr)
183 , file_buffer_size_(0)
184 , max_parallel_ops_(1)
185 , multipart_part_size_(0)
186 , vfs_thread_pool_(nullptr)
187 , use_virtual_addressing_(false)
188 , use_multipart_upload_(true)
189 , request_payer_(Aws::S3::Model::RequestPayer::NOT_SET)
190 , sse_(Aws::S3::Model::ServerSideEncryption::NOT_SET)
191 , object_canned_acl_(Aws::S3::Model::ObjectCannedACL::NOT_SET)
192 , bucket_canned_acl_(Aws::S3::Model::BucketCannedACL::NOT_SET) {
193 }
194
~S3()195 S3::~S3() {
196 assert(state_ == State::DISCONNECTED);
197 for (auto& buff : file_buffers_)
198 tdb_delete(buff.second);
199 }
200
201 /* ********************************* */
202 /* API */
203 /* ********************************* */
204
init(stats::Stats * const parent_stats,const Config & config,ThreadPool * const thread_pool)205 Status S3::init(
206 stats::Stats* const parent_stats,
207 const Config& config,
208 ThreadPool* const thread_pool) {
209 // already initialized
210 if (state_ == State::DISCONNECTED)
211 return Status::Ok();
212
213 assert(state_ == State::UNINITIALIZED);
214
215 stats_ = parent_stats->create_child("S3");
216
217 if (thread_pool == nullptr) {
218 return LOG_STATUS(
219 Status::S3Error("Can't initialize with null thread pool."));
220 }
221
222 bool found = false;
223 auto logging_level = config.get("vfs.s3.logging_level", &found);
224 assert(found);
225
226 options_.loggingOptions.logLevel = aws_log_name_to_level(logging_level);
227
228 // By default, curl sets the signal handler for SIGPIPE to SIG_IGN while
229 // executing. When curl is done executing, it restores the previous signal
230 // handler. This is not thread safe, so the AWS SDK disables this behavior
231 // in curl using the `CURLOPT_NOSIGNAL` option.
232 // Here, we set the `installSigPipeHandler` AWS SDK option to `true` to allow
233 // the AWS SDK to set its own signal handler to ignore SIGPIPE signals. A
234 // SIGPIPE may be raised from the socket library when the peer disconnects
235 // unexpectedly.
236 options_.httpOptions.installSigPipeHandler = true;
237
238 bool skip_init;
239 RETURN_NOT_OK(config.get<bool>("vfs.s3.skip_init", &skip_init, &found));
240 assert(found);
241
242 // Initialize the library once per process.
243 if (!skip_init)
244 std::call_once(aws_lib_initialized, [this]() { Aws::InitAPI(options_); });
245
246 if (options_.loggingOptions.logLevel != Aws::Utils::Logging::LogLevel::Off) {
247 Aws::Utils::Logging::InitializeAWSLogging(
248 Aws::MakeShared<Aws::Utils::Logging::DefaultLogSystem>(
249 "TileDB", Aws::Utils::Logging::LogLevel::Trace, "tiledb_s3_"));
250 }
251
252 vfs_thread_pool_ = thread_pool;
253 RETURN_NOT_OK(config.get<uint64_t>(
254 "vfs.s3.max_parallel_ops", &max_parallel_ops_, &found));
255 assert(found);
256 RETURN_NOT_OK(config.get<uint64_t>(
257 "vfs.s3.multipart_part_size", &multipart_part_size_, &found));
258 assert(found);
259 file_buffer_size_ = multipart_part_size_ * max_parallel_ops_;
260 region_ = config.get("vfs.s3.region", &found);
261 assert(found);
262 RETURN_NOT_OK(config.get<bool>(
263 "vfs.s3.use_virtual_addressing", &use_virtual_addressing_, &found));
264 assert(found);
265 RETURN_NOT_OK(config.get<bool>(
266 "vfs.s3.use_multipart_upload", &use_multipart_upload_, &found));
267 assert(found);
268
269 bool request_payer;
270 RETURN_NOT_OK(
271 config.get<bool>("vfs.s3.requester_pays", &request_payer, &found));
272 assert(found);
273
274 if (request_payer)
275 request_payer_ = Aws::S3::Model::RequestPayer::requester;
276
277 auto object_acl_str = config.get("vfs.s3.object_canned_acl", &found);
278 assert(found);
279 if (found) {
280 object_canned_acl_ = S3_ObjectCannedACL_from_str(object_acl_str);
281 }
282
283 auto bucket_acl_str = config.get("vfs.s3.bucket_canned_acl", &found);
284 assert(found);
285 if (found) {
286 bucket_canned_acl_ = S3_BucketCannedACL_from_str(bucket_acl_str);
287 }
288
289 auto sse = config.get("vfs.s3.sse", &found);
290 assert(found);
291
292 auto sse_kms_key_id = config.get("vfs.s3.sse_kms_key_id", &found);
293 assert(found);
294
295 if (!sse.empty()) {
296 if (sse == "aes256") {
297 sse_ = Aws::S3::Model::ServerSideEncryption::AES256;
298 } else if (sse == "kms") {
299 sse_ = Aws::S3::Model::ServerSideEncryption::aws_kms;
300 sse_kms_key_id_ = sse_kms_key_id;
301 if (sse_kms_key_id_.empty()) {
302 return Status::S3Error(
303 "Config parameter 'vfs.s3.sse_kms_key_id' must be set "
304 "for kms server-side encryption.");
305 }
306 } else {
307 return Status::S3Error(
308 "Unknown 'vfs.s3.sse' config value " + sse +
309 "; supported values are 'aes256' and 'kms'.");
310 }
311 }
312
313 // Ensure `sse_kms_key_id` was only set for kms encryption.
314 if (!sse_kms_key_id.empty() &&
315 sse_ != Aws::S3::Model::ServerSideEncryption::aws_kms) {
316 return Status::S3Error(
317 "Config parameter 'vfs.s3.sse_kms_key_id' may only be "
318 "set for 'vfs.s3.sse' == 'kms'.");
319 }
320
321 config_ = config;
322
323 state_ = State::INITIALIZED;
324 return Status::Ok();
325 }
326
create_bucket(const URI & bucket) const327 Status S3::create_bucket(const URI& bucket) const {
328 RETURN_NOT_OK(init_client());
329
330 if (!bucket.is_s3()) {
331 return LOG_STATUS(Status::S3Error(
332 std::string("URI is not an S3 URI: " + bucket.to_string())));
333 }
334
335 Aws::Http::URI aws_uri = bucket.c_str();
336 Aws::S3::Model::CreateBucketRequest create_bucket_request;
337 create_bucket_request.SetBucket(aws_uri.GetAuthority());
338
339 // Set the bucket location constraint equal to the S3 region.
340 // Note: empty string and 'us-east-1' are parsing errors in the SDK.
341 if (!region_.empty() && region_ != "us-east-1") {
342 Aws::S3::Model::CreateBucketConfiguration cfg;
343 Aws::String region_str(region_.c_str());
344 auto location_constraint = Aws::S3::Model::BucketLocationConstraintMapper::
345 GetBucketLocationConstraintForName(region_str);
346 cfg.SetLocationConstraint(location_constraint);
347 create_bucket_request.SetCreateBucketConfiguration(cfg);
348 }
349
350 if (bucket_canned_acl_ != Aws::S3::Model::BucketCannedACL::NOT_SET) {
351 create_bucket_request.SetACL(bucket_canned_acl_);
352 }
353
354 auto create_bucket_outcome = client_->CreateBucket(create_bucket_request);
355 if (!create_bucket_outcome.IsSuccess()) {
356 return LOG_STATUS(Status::S3Error(
357 std::string("Failed to create S3 bucket ") + bucket.to_string() +
358 outcome_error_message(create_bucket_outcome)));
359 }
360
361 RETURN_NOT_OK(wait_for_bucket_to_be_created(bucket));
362
363 return Status::Ok();
364 }
365
remove_bucket(const URI & bucket) const366 Status S3::remove_bucket(const URI& bucket) const {
367 RETURN_NOT_OK(init_client());
368
369 // Empty bucket
370 RETURN_NOT_OK(empty_bucket(bucket));
371
372 // Delete bucket
373 Aws::Http::URI aws_uri = bucket.c_str();
374 Aws::S3::Model::DeleteBucketRequest delete_bucket_request;
375 delete_bucket_request.SetBucket(aws_uri.GetAuthority());
376 auto delete_bucket_outcome = client_->DeleteBucket(delete_bucket_request);
377 if (!delete_bucket_outcome.IsSuccess()) {
378 return LOG_STATUS(Status::S3Error(
379 std::string("Failed to remove S3 bucket ") + bucket.to_string() +
380 outcome_error_message(delete_bucket_outcome)));
381 }
382 return Status::Ok();
383 }
384
disconnect()385 Status S3::disconnect() {
386 Status ret_st = Status::Ok();
387
388 if (state_ == State::UNINITIALIZED) {
389 return ret_st;
390 }
391
392 // Read-lock 'multipart_upload_states_'.
393 UniqueReadLock unique_rl(&multipart_upload_rwlock_);
394
395 if (multipart_upload_states_.size() > 0) {
396 RETURN_NOT_OK(init_client());
397
398 std::vector<const MultiPartUploadState*> states;
399 states.reserve(multipart_upload_states_.size());
400 for (auto& kv : multipart_upload_states_)
401 states.emplace_back(&kv.second);
402
403 auto status =
404 parallel_for(vfs_thread_pool_, 0, states.size(), [&](uint64_t i) {
405 const MultiPartUploadState* state = states[i];
406 // Lock multipart state
407 std::unique_lock<std::mutex> state_lck(state->mtx);
408
409 if (state->st.ok()) {
410 Aws::S3::Model::CompleteMultipartUploadRequest complete_request =
411 make_multipart_complete_request(*state);
412 auto outcome = client_->CompleteMultipartUpload(complete_request);
413 if (!outcome.IsSuccess()) {
414 const Status st = LOG_STATUS(Status::S3Error(
415 std::string("Failed to disconnect and flush S3 objects. ") +
416 outcome_error_message(outcome)));
417 if (!st.ok()) {
418 ret_st = st;
419 }
420 }
421 } else {
422 Aws::S3::Model::AbortMultipartUploadRequest abort_request =
423 make_multipart_abort_request(*state);
424 auto outcome = client_->AbortMultipartUpload(abort_request);
425 if (!outcome.IsSuccess()) {
426 const Status st = LOG_STATUS(Status::S3Error(
427 std::string("Failed to disconnect and flush S3 objects. ") +
428 outcome_error_message(outcome)));
429 if (!st.ok()) {
430 ret_st = st;
431 }
432 }
433 }
434 return Status::Ok();
435 });
436
437 RETURN_NOT_OK(status);
438 }
439
440 unique_rl.unlock();
441
442 if (options_.loggingOptions.logLevel != Aws::Utils::Logging::LogLevel::Off) {
443 Aws::Utils::Logging::ShutdownAWSLogging();
444 }
445
446 if (s3_tp_executor_) {
447 const Status st = s3_tp_executor_->Stop();
448 if (!st.ok()) {
449 ret_st = st;
450 }
451 }
452
453 state_ = State::DISCONNECTED;
454 return ret_st;
455 }
456
empty_bucket(const URI & bucket) const457 Status S3::empty_bucket(const URI& bucket) const {
458 RETURN_NOT_OK(init_client());
459
460 auto uri_dir = bucket.add_trailing_slash();
461 return remove_dir(uri_dir);
462 }
463
flush_object(const URI & uri)464 Status S3::flush_object(const URI& uri) {
465 RETURN_NOT_OK(init_client());
466 if (!use_multipart_upload_) {
467 return flush_direct(uri);
468 }
469 if (!uri.is_s3()) {
470 return LOG_STATUS(Status::S3Error(
471 std::string("URI is not an S3 URI: " + uri.to_string())));
472 }
473
474 // Flush and delete file buffer. For multipart requests, we must
475 // continue even if 'flush_file_buffer' fails. In that scenario,
476 // we will send an abort request.
477 auto buff = (Buffer*)nullptr;
478 RETURN_NOT_OK(get_file_buffer(uri, &buff));
479 const Status flush_st = flush_file_buffer(uri, buff, true);
480
481 Aws::Http::URI aws_uri = uri.c_str();
482 std::string path_c_str = aws_uri.GetPath().c_str();
483
484 // Take a lock protecting 'multipart_upload_states_'.
485 UniqueReadLock unique_rl(&multipart_upload_rwlock_);
486
487 // Do nothing - empty object
488 auto state_iter = multipart_upload_states_.find(path_c_str);
489 if (state_iter == multipart_upload_states_.end()) {
490 RETURN_NOT_OK(flush_st);
491 return Status::Ok();
492 }
493
494 const MultiPartUploadState* state = &multipart_upload_states_.at(path_c_str);
495 // Lock multipart state
496 std::unique_lock<std::mutex> state_lck(state->mtx);
497 unique_rl.unlock();
498
499 if (state->st.ok()) {
500 Aws::S3::Model::CompleteMultipartUploadRequest complete_request =
501 make_multipart_complete_request(*state);
502 auto outcome = client_->CompleteMultipartUpload(complete_request);
503 if (!outcome.IsSuccess()) {
504 return LOG_STATUS(Status::S3Error(
505 std::string("Failed to flush S3 object ") + uri.c_str() +
506 outcome_error_message(outcome)));
507 }
508
509 auto bucket = state->bucket;
510 auto key = state->key;
511 // It is safe to unlock the state here
512 state_lck.unlock();
513
514 wait_for_object_to_propagate(move(bucket), move(key));
515
516 return finish_flush_object(std::move(outcome), uri, buff);
517 } else {
518 Aws::S3::Model::AbortMultipartUploadRequest abort_request =
519 make_multipart_abort_request(*state);
520
521 auto outcome = client_->AbortMultipartUpload(abort_request);
522
523 state_lck.unlock();
524
525 return finish_flush_object(std::move(outcome), uri, buff);
526 }
527 }
528
529 Aws::S3::Model::CompleteMultipartUploadRequest
make_multipart_complete_request(const MultiPartUploadState & state)530 S3::make_multipart_complete_request(const MultiPartUploadState& state) {
531 // Add all the completed parts (sorted by part number) to the upload object.
532 Aws::S3::Model::CompletedMultipartUpload completed_upload;
533 for (auto& tup : state.completed_parts) {
534 const Aws::S3::Model::CompletedPart& part = std::get<1>(tup);
535 completed_upload.AddParts(part);
536 }
537
538 Aws::S3::Model::CompleteMultipartUploadRequest complete_request;
539 complete_request.SetBucket(state.bucket);
540 complete_request.SetKey(state.key);
541 complete_request.SetUploadId(state.upload_id);
542 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
543 complete_request.SetRequestPayer(request_payer_);
544 return complete_request.WithMultipartUpload(std::move(completed_upload));
545 }
546
make_multipart_abort_request(const MultiPartUploadState & state)547 Aws::S3::Model::AbortMultipartUploadRequest S3::make_multipart_abort_request(
548 const MultiPartUploadState& state) {
549 Aws::S3::Model::AbortMultipartUploadRequest abort_request;
550 abort_request.SetBucket(state.bucket);
551 abort_request.SetKey(state.key);
552 abort_request.SetUploadId(state.upload_id);
553 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
554 abort_request.SetRequestPayer(request_payer_);
555 return abort_request;
556 }
557
558 template <typename R, typename E>
finish_flush_object(const Aws::Utils::Outcome<R,E> & outcome,const URI & uri,Buffer * const buff)559 Status S3::finish_flush_object(
560 const Aws::Utils::Outcome<R, E>& outcome,
561 const URI& uri,
562 Buffer* const buff) {
563 Aws::Http::URI aws_uri = uri.c_str();
564
565 UniqueWriteLock unique_wl(&multipart_upload_rwlock_);
566 multipart_upload_states_.erase(aws_uri.GetPath().c_str());
567 unique_wl.unlock();
568
569 std::unique_lock<std::mutex> file_buffers_lck(file_buffers_mtx_);
570 file_buffers_.erase(uri.to_string());
571 file_buffers_lck.unlock();
572 tdb_delete(buff);
573
574 if (!outcome.IsSuccess()) {
575 return LOG_STATUS(Status::S3Error(
576 std::string("Failed to flush S3 object ") + uri.c_str() +
577 outcome_error_message(outcome)));
578 }
579
580 return Status::Ok();
581 }
582
is_empty_bucket(const URI & bucket,bool * is_empty) const583 Status S3::is_empty_bucket(const URI& bucket, bool* is_empty) const {
584 RETURN_NOT_OK(init_client());
585
586 bool exists;
587 RETURN_NOT_OK(is_bucket(bucket, &exists));
588 if (!exists)
589 return LOG_STATUS(Status::S3Error(
590 "Cannot check if bucket is empty; Bucket does not exist"));
591
592 Aws::Http::URI aws_uri = bucket.c_str();
593 Aws::S3::Model::ListObjectsRequest list_objects_request;
594 list_objects_request.SetBucket(aws_uri.GetAuthority());
595 list_objects_request.SetPrefix("");
596 list_objects_request.SetDelimiter("/");
597 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
598 list_objects_request.SetRequestPayer(request_payer_);
599 auto list_objects_outcome = client_->ListObjects(list_objects_request);
600
601 if (!list_objects_outcome.IsSuccess()) {
602 return LOG_STATUS(Status::S3Error(
603 std::string("Failed to list s3 objects in bucket ") + bucket.c_str() +
604 outcome_error_message(list_objects_outcome)));
605 }
606
607 *is_empty = list_objects_outcome.GetResult().GetContents().empty() &&
608 list_objects_outcome.GetResult().GetCommonPrefixes().empty();
609
610 return Status::Ok();
611 }
612
is_bucket(const URI & uri,bool * const exists) const613 Status S3::is_bucket(const URI& uri, bool* const exists) const {
614 init_client();
615
616 if (!uri.is_s3()) {
617 return LOG_STATUS(Status::S3Error(
618 std::string("URI is not an S3 URI: " + uri.to_string())));
619 }
620
621 Aws::Http::URI aws_uri = uri.c_str();
622 Aws::S3::Model::HeadBucketRequest head_bucket_request;
623 head_bucket_request.SetBucket(aws_uri.GetAuthority());
624 auto head_bucket_outcome = client_->HeadBucket(head_bucket_request);
625 *exists = head_bucket_outcome.IsSuccess();
626
627 return Status::Ok();
628 }
629
is_object(const URI & uri,bool * const exists) const630 Status S3::is_object(const URI& uri, bool* const exists) const {
631 init_client();
632
633 if (!uri.is_s3()) {
634 return LOG_STATUS(Status::S3Error(
635 std::string("URI is not an S3 URI: " + uri.to_string())));
636 }
637
638 Aws::Http::URI aws_uri = uri.c_str();
639
640 return is_object(aws_uri.GetAuthority(), aws_uri.GetPath(), exists);
641 }
642
is_object(const Aws::String & bucket_name,const Aws::String & object_key,bool * const exists) const643 Status S3::is_object(
644 const Aws::String& bucket_name,
645 const Aws::String& object_key,
646 bool* const exists) const {
647 init_client();
648
649 Aws::S3::Model::HeadObjectRequest head_object_request;
650 head_object_request.SetBucket(bucket_name);
651 head_object_request.SetKey(object_key);
652 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
653 head_object_request.SetRequestPayer(request_payer_);
654 auto head_object_outcome = client_->HeadObject(head_object_request);
655 *exists = head_object_outcome.IsSuccess();
656
657 return Status::Ok();
658 }
659
is_dir(const URI & uri,bool * exists) const660 Status S3::is_dir(const URI& uri, bool* exists) const {
661 RETURN_NOT_OK(init_client());
662
663 // Potentially add `/` to the end of `uri`
664 auto uri_dir = uri.add_trailing_slash();
665 std::vector<std::string> paths;
666 RETURN_NOT_OK(ls(uri_dir, &paths, "/", 1));
667 *exists = (bool)paths.size();
668 return Status::Ok();
669 }
670
ls(const URI & prefix,std::vector<std::string> * paths,const std::string & delimiter,int max_paths) const671 Status S3::ls(
672 const URI& prefix,
673 std::vector<std::string>* paths,
674 const std::string& delimiter,
675 int max_paths) const {
676 RETURN_NOT_OK(init_client());
677
678 const auto prefix_dir = prefix.add_trailing_slash();
679
680 auto prefix_str = prefix_dir.to_string();
681 if (!prefix_dir.is_s3()) {
682 return LOG_STATUS(
683 Status::S3Error(std::string("URI is not an S3 URI: " + prefix_str)));
684 }
685
686 Aws::Http::URI aws_uri = prefix_str.c_str();
687 auto aws_prefix = remove_front_slash(aws_uri.GetPath().c_str());
688 std::string aws_auth = aws_uri.GetAuthority().c_str();
689 Aws::S3::Model::ListObjectsRequest list_objects_request;
690 list_objects_request.SetBucket(aws_uri.GetAuthority());
691 list_objects_request.SetPrefix(aws_prefix.c_str());
692 list_objects_request.SetDelimiter(delimiter.c_str());
693 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
694 list_objects_request.SetRequestPayer(request_payer_);
695
696 bool is_done = false;
697 while (!is_done) {
698 // Not requesting more items than needed
699 if (max_paths != -1)
700 list_objects_request.SetMaxKeys(
701 max_paths - static_cast<int>(paths->size()));
702 auto list_objects_outcome = client_->ListObjects(list_objects_request);
703
704 if (!list_objects_outcome.IsSuccess())
705 return LOG_STATUS(Status::S3Error(
706 std::string("Error while listing with prefix '") + prefix_str +
707 "' and delimiter '" + delimiter + "'" +
708 outcome_error_message(list_objects_outcome)));
709
710 for (const auto& object : list_objects_outcome.GetResult().GetContents()) {
711 std::string file(object.GetKey().c_str());
712 paths->push_back("s3://" + aws_auth + add_front_slash(file));
713 }
714
715 for (const auto& object :
716 list_objects_outcome.GetResult().GetCommonPrefixes()) {
717 std::string file(object.GetPrefix().c_str());
718 paths->push_back(
719 "s3://" + aws_auth + add_front_slash(remove_trailing_slash(file)));
720 }
721
722 is_done =
723 !list_objects_outcome.GetResult().GetIsTruncated() ||
724 (max_paths != -1 && paths->size() >= static_cast<size_t>(max_paths));
725 if (!is_done) {
726 // The documentation states that "GetNextMarker" will be non-empty only
727 // when the delimiter in the request is non-empty. When the delimiter is
728 // non-empty, we must used the last returned key as the next marker.
729 assert(
730 !delimiter.empty() ||
731 !list_objects_outcome.GetResult().GetContents().empty());
732 Aws::String next_marker =
733 !delimiter.empty() ?
734 list_objects_outcome.GetResult().GetNextMarker() :
735 list_objects_outcome.GetResult().GetContents().back().GetKey();
736 assert(!next_marker.empty());
737
738 list_objects_request.SetMarker(std::move(next_marker));
739 }
740 }
741
742 return Status::Ok();
743 }
744
move_object(const URI & old_uri,const URI & new_uri)745 Status S3::move_object(const URI& old_uri, const URI& new_uri) {
746 RETURN_NOT_OK(init_client());
747
748 RETURN_NOT_OK(copy_object(old_uri, new_uri));
749 RETURN_NOT_OK(remove_object(old_uri));
750 return Status::Ok();
751 }
752
move_dir(const URI & old_uri,const URI & new_uri)753 Status S3::move_dir(const URI& old_uri, const URI& new_uri) {
754 RETURN_NOT_OK(init_client());
755
756 std::vector<std::string> paths;
757 RETURN_NOT_OK(ls(old_uri, &paths, ""));
758 for (const auto& path : paths) {
759 auto suffix = path.substr(old_uri.to_string().size());
760 auto new_path = new_uri.join_path(suffix);
761 RETURN_NOT_OK(move_object(URI(path), URI(new_path)));
762 }
763
764 return Status::Ok();
765 }
766
copy_file(const URI & old_uri,const URI & new_uri)767 Status S3::copy_file(const URI& old_uri, const URI& new_uri) {
768 RETURN_NOT_OK(init_client());
769
770 RETURN_NOT_OK(copy_object(old_uri, new_uri));
771 return Status::Ok();
772 }
773
copy_dir(const URI & old_uri,const URI & new_uri)774 Status S3::copy_dir(const URI& old_uri, const URI& new_uri) {
775 RETURN_NOT_OK(init_client());
776
777 std::string old_uri_string = old_uri.to_string();
778 std::vector<std::string> paths;
779 RETURN_NOT_OK(ls(old_uri, &paths));
780 while (!paths.empty()) {
781 std::string file_name_abs = paths.front();
782 URI file_name_uri = URI(file_name_abs);
783 std::string file_name = file_name_abs.substr(old_uri_string.length());
784 paths.erase(paths.begin());
785
786 bool dir_exists;
787 RETURN_NOT_OK(is_dir(file_name_uri, &dir_exists));
788 if (dir_exists) {
789 std::vector<std::string> child_paths;
790 RETURN_NOT_OK(ls(file_name_uri, &child_paths));
791 paths.insert(paths.end(), child_paths.begin(), child_paths.end());
792 } else {
793 std::string new_path_string = new_uri.to_string() + file_name;
794 URI new_path_uri = URI(new_path_string);
795 RETURN_NOT_OK(copy_object(file_name_uri, new_path_uri));
796 }
797 }
798
799 return Status::Ok();
800 }
801
object_size(const URI & uri,uint64_t * nbytes) const802 Status S3::object_size(const URI& uri, uint64_t* nbytes) const {
803 RETURN_NOT_OK(init_client());
804
805 if (!uri.is_s3()) {
806 return LOG_STATUS(Status::S3Error(
807 std::string("URI is not an S3 URI: " + uri.to_string())));
808 }
809
810 Aws::Http::URI aws_uri = uri.to_string().c_str();
811 auto aws_path = remove_front_slash(aws_uri.GetPath().c_str());
812
813 Aws::S3::Model::HeadObjectRequest head_object_request;
814 head_object_request.SetBucket(aws_uri.GetAuthority());
815 head_object_request.SetKey(aws_path.c_str());
816 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
817 head_object_request.SetRequestPayer(request_payer_);
818 auto head_object_outcome = client_->HeadObject(head_object_request);
819
820 if (!head_object_outcome.IsSuccess())
821 return LOG_STATUS(Status::S3Error(
822 "Cannot retrieve S3 object size; Error while listing file " +
823 uri.to_string() + outcome_error_message(head_object_outcome)));
824 *nbytes =
825 static_cast<uint64_t>(head_object_outcome.GetResult().GetContentLength());
826
827 return Status::Ok();
828 }
829
read(const URI & uri,const off_t offset,void * const buffer,const uint64_t length,const uint64_t read_ahead_length,uint64_t * const length_returned) const830 Status S3::read(
831 const URI& uri,
832 const off_t offset,
833 void* const buffer,
834 const uint64_t length,
835 const uint64_t read_ahead_length,
836 uint64_t* const length_returned) const {
837 RETURN_NOT_OK(init_client());
838
839 if (!uri.is_s3()) {
840 return LOG_STATUS(Status::S3Error(
841 std::string("URI is not an S3 URI: " + uri.to_string())));
842 }
843
844 Aws::Http::URI aws_uri = uri.c_str();
845 Aws::S3::Model::GetObjectRequest get_object_request;
846 get_object_request.WithBucket(aws_uri.GetAuthority())
847 .WithKey(aws_uri.GetPath());
848 get_object_request.SetRange(
849 ("bytes=" + std::to_string(offset) + "-" +
850 std::to_string(offset + length + read_ahead_length - 1))
851 .c_str());
852 get_object_request.SetResponseStreamFactory(
853 [buffer, length, read_ahead_length]() {
854 return Aws::New<PreallocatedIOStream>(
855 constants::s3_allocation_tag.c_str(),
856 buffer,
857 length + read_ahead_length);
858 });
859
860 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
861 get_object_request.SetRequestPayer(request_payer_);
862
863 auto get_object_outcome = client_->GetObject(get_object_request);
864 if (!get_object_outcome.IsSuccess()) {
865 return LOG_STATUS(Status::S3Error(
866 std::string("Failed to read S3 object ") + uri.c_str() +
867 outcome_error_message(get_object_outcome)));
868 }
869
870 *length_returned =
871 static_cast<uint64_t>(get_object_outcome.GetResult().GetContentLength());
872 if (*length_returned < length) {
873 return LOG_STATUS(Status::S3Error(
874 std::string("Read operation returned different size of bytes ") +
875 std::to_string(*length_returned) + " vs " + std::to_string(length)));
876 }
877
878 return Status::Ok();
879 }
880
remove_object(const URI & uri) const881 Status S3::remove_object(const URI& uri) const {
882 RETURN_NOT_OK(init_client());
883
884 if (!uri.is_s3()) {
885 return LOG_STATUS(Status::S3Error(
886 std::string("URI is not an S3 URI: " + uri.to_string())));
887 }
888
889 Aws::Http::URI aws_uri = uri.to_string().c_str();
890 Aws::S3::Model::DeleteObjectRequest delete_object_request;
891 delete_object_request.SetBucket(aws_uri.GetAuthority());
892 delete_object_request.SetKey(aws_uri.GetPath());
893 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
894 delete_object_request.SetRequestPayer(request_payer_);
895
896 auto delete_object_outcome = client_->DeleteObject(delete_object_request);
897 if (!delete_object_outcome.IsSuccess()) {
898 return LOG_STATUS(Status::S3Error(
899 std::string("Failed to delete S3 object '") + uri.c_str() +
900 outcome_error_message(delete_object_outcome)));
901 }
902
903 wait_for_object_to_be_deleted(
904 delete_object_request.GetBucket(), delete_object_request.GetKey());
905 return Status::Ok();
906 }
907
remove_dir(const URI & uri) const908 Status S3::remove_dir(const URI& uri) const {
909 RETURN_NOT_OK(init_client());
910
911 std::vector<std::string> paths;
912 auto uri_dir = uri.add_trailing_slash();
913 RETURN_NOT_OK(ls(uri_dir, &paths, ""));
914 for (const auto& p : paths)
915 RETURN_NOT_OK(remove_object(URI(p)));
916 return Status::Ok();
917 }
918
touch(const URI & uri) const919 Status S3::touch(const URI& uri) const {
920 RETURN_NOT_OK(init_client());
921
922 if (!uri.is_s3()) {
923 return LOG_STATUS(Status::S3Error(std::string(
924 "Cannot create file; URI is not an S3 URI: " + uri.to_string())));
925 }
926
927 if (uri.to_string().back() == '/') {
928 return LOG_STATUS(Status::S3Error(std::string(
929 "Cannot create file; URI is a directory: " + uri.to_string())));
930 }
931
932 bool exists;
933 RETURN_NOT_OK(is_object(uri, &exists));
934 if (exists) {
935 return Status::Ok();
936 }
937
938 Aws::Http::URI aws_uri = uri.c_str();
939 Aws::S3::Model::PutObjectRequest put_object_request;
940 put_object_request.WithKey(aws_uri.GetPath())
941 .WithBucket(aws_uri.GetAuthority());
942
943 auto request_stream =
944 Aws::MakeShared<Aws::StringStream>(constants::s3_allocation_tag.c_str());
945 put_object_request.SetBody(request_stream);
946 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
947 put_object_request.SetRequestPayer(request_payer_);
948 if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
949 put_object_request.SetServerSideEncryption(sse_);
950 if (!sse_kms_key_id_.empty())
951 put_object_request.SetSSEKMSKeyId(Aws::String(sse_kms_key_id_.c_str()));
952 if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
953 put_object_request.SetACL(object_canned_acl_);
954 }
955
956 auto put_object_outcome = client_->PutObject(put_object_request);
957 if (!put_object_outcome.IsSuccess()) {
958 return LOG_STATUS(Status::S3Error(
959 std::string("Cannot touch object '") + uri.c_str() +
960 outcome_error_message(put_object_outcome)));
961 }
962
963 wait_for_object_to_propagate(
964 put_object_request.GetBucket(), put_object_request.GetKey());
965
966 return Status::Ok();
967 }
968
write(const URI & uri,const void * buffer,uint64_t length)969 Status S3::write(const URI& uri, const void* buffer, uint64_t length) {
970 RETURN_NOT_OK(init_client());
971
972 if (!uri.is_s3()) {
973 return LOG_STATUS(Status::S3Error(
974 std::string("URI is not an S3 URI: " + uri.to_string())));
975 }
976
977 // This write is never considered the last part of an object. The last part is
978 // only uploaded with flush_object().
979 const bool is_last_part = false;
980
981 // Get file buffer
982 auto buff = (Buffer*)nullptr;
983 RETURN_NOT_OK(get_file_buffer(uri, &buff));
984
985 // Fill file buffer
986 uint64_t nbytes_filled;
987 RETURN_NOT_OK(fill_file_buffer(buff, buffer, length, &nbytes_filled));
988
989 if ((!use_multipart_upload_) && (nbytes_filled != length)) {
990 std::stringstream errmsg;
991 errmsg << "Direct write failed! " << nbytes_filled
992 << " bytes written to buffer, " << length << " bytes requested.";
993 return LOG_STATUS(Status::S3Error(errmsg.str()));
994 }
995
996 // Flush file buffer
997 // multipart objects will flush whenever the writes exceed file_buffer_size_
998 // write_direct should just append to buffer and upload later
999 if (use_multipart_upload_) {
1000 if (buff->size() == file_buffer_size_)
1001 RETURN_NOT_OK(flush_file_buffer(uri, buff, is_last_part));
1002
1003 uint64_t new_length = length - nbytes_filled;
1004 uint64_t offset = nbytes_filled;
1005 // Write chunks
1006 while (new_length > 0) {
1007 if (new_length >= file_buffer_size_) {
1008 RETURN_NOT_OK(write_multipart(
1009 uri, (char*)buffer + offset, file_buffer_size_, is_last_part));
1010 offset += file_buffer_size_;
1011 new_length -= file_buffer_size_;
1012 } else {
1013 RETURN_NOT_OK(fill_file_buffer(
1014 buff, (char*)buffer + offset, new_length, &nbytes_filled));
1015 offset += nbytes_filled;
1016 new_length -= nbytes_filled;
1017 }
1018 }
1019 assert(offset == length);
1020 }
1021
1022 return Status::Ok();
1023 }
1024
1025 /* ********************************* */
1026 /* PRIVATE METHODS */
1027 /* ********************************* */
1028
init_client() const1029 Status S3::init_client() const {
1030 assert(state_ == State::INITIALIZED);
1031
1032 std::lock_guard<std::mutex> lck(client_init_mtx_);
1033
1034 if (client_ != nullptr) {
1035 // Check credentials. If expired, referesh it
1036 if (credentials_provider_) {
1037 Aws::Auth::AWSCredentials credentials =
1038 credentials_provider_->GetAWSCredentials();
1039 if (credentials.IsExpiredOrEmpty()) {
1040 return LOG_STATUS(
1041 Status::S3Error(std::string("Credentials is expired or empty.")));
1042 }
1043 }
1044 return Status::Ok();
1045 }
1046
1047 bool found;
1048 auto s3_endpoint_override = config_.get("vfs.s3.endpoint_override", &found);
1049 assert(found);
1050
1051 // ClientConfiguration should be lazily init'ed here in init_client to avoid
1052 // potential slowdowns for non s3 users as the ClientConfig now attempts to
1053 // check for client configuration on create, which can be slow if aws is not
1054 // configured on a users systems due to ec2 metadata check
1055
1056 client_config_ = tdb_unique_ptr<Aws::Client::ClientConfiguration>(
1057 tdb_new(Aws::Client::ClientConfiguration));
1058
1059 s3_tp_executor_ = std::make_shared<S3ThreadPoolExecutor>(vfs_thread_pool_);
1060
1061 client_config_->executor = s3_tp_executor_;
1062
1063 auto& client_config = *client_config_.get();
1064
1065 if (!region_.empty())
1066 client_config.region = region_.c_str();
1067
1068 if (!s3_endpoint_override.empty())
1069 client_config.endpointOverride = s3_endpoint_override.c_str();
1070
1071 auto proxy_host = config_.get("vfs.s3.proxy_host", &found);
1072 assert(found);
1073
1074 uint32_t proxy_port = 0;
1075 RETURN_NOT_OK(
1076 config_.get<uint32_t>("vfs.s3.proxy_port", &proxy_port, &found));
1077 assert(found);
1078
1079 auto proxy_username = config_.get("vfs.s3.proxy_username", &found);
1080 assert(found);
1081
1082 auto proxy_password = config_.get("vfs.s3.proxy_password", &found);
1083 assert(found);
1084
1085 auto proxy_scheme = config_.get("vfs.s3.proxy_scheme", &found);
1086 assert(found);
1087
1088 if (!proxy_host.empty()) {
1089 client_config.proxyHost = proxy_host.c_str();
1090 client_config.proxyPort = proxy_port;
1091 client_config.proxyScheme = proxy_scheme == "https" ?
1092 Aws::Http::Scheme::HTTPS :
1093 Aws::Http::Scheme::HTTP;
1094 client_config.proxyUserName = proxy_username.c_str();
1095 client_config.proxyPassword = proxy_password.c_str();
1096 }
1097
1098 auto s3_scheme = config_.get("vfs.s3.scheme", &found);
1099 assert(found);
1100
1101 int64_t connect_timeout_ms = 0;
1102 RETURN_NOT_OK(config_.get<int64_t>(
1103 "vfs.s3.connect_timeout_ms", &connect_timeout_ms, &found));
1104 assert(found);
1105
1106 int64_t request_timeout_ms = 0;
1107 RETURN_NOT_OK(config_.get<int64_t>(
1108 "vfs.s3.request_timeout_ms", &request_timeout_ms, &found));
1109 assert(found);
1110
1111 auto ca_file = config_.get("vfs.s3.ca_file", &found);
1112 assert(found);
1113
1114 auto ca_path = config_.get("vfs.s3.ca_path", &found);
1115 assert(found);
1116
1117 bool verify_ssl = false;
1118 RETURN_NOT_OK(config_.get<bool>("vfs.s3.verify_ssl", &verify_ssl, &found));
1119 assert(found);
1120
1121 auto aws_access_key_id = config_.get("vfs.s3.aws_access_key_id", &found);
1122 assert(found);
1123
1124 auto aws_secret_access_key =
1125 config_.get("vfs.s3.aws_secret_access_key", &found);
1126 assert(found);
1127
1128 auto aws_session_token = config_.get("vfs.s3.aws_session_token", &found);
1129 assert(found);
1130
1131 auto aws_role_arn = config_.get("vfs.s3.aws_role_arn", &found);
1132 assert(found);
1133
1134 auto aws_external_id = config_.get("vfs.s3.aws_external_id", &found);
1135 assert(found);
1136
1137 auto aws_load_frequency = config_.get("vfs.s3.aws_load_frequency", &found);
1138 assert(found);
1139
1140 auto aws_session_name = config_.get("vfs.s3.aws_session_name", &found);
1141 assert(found);
1142
1143 int64_t connect_max_tries = 0;
1144 RETURN_NOT_OK(config_.get<int64_t>(
1145 "vfs.s3.connect_max_tries", &connect_max_tries, &found));
1146 assert(found);
1147
1148 int64_t connect_scale_factor = 0;
1149 RETURN_NOT_OK(config_.get<int64_t>(
1150 "vfs.s3.connect_scale_factor", &connect_scale_factor, &found));
1151 assert(found);
1152
1153 client_config.scheme = (s3_scheme == "http") ? Aws::Http::Scheme::HTTP :
1154 Aws::Http::Scheme::HTTPS;
1155 client_config.connectTimeoutMs = (long)connect_timeout_ms;
1156 client_config.requestTimeoutMs = (long)request_timeout_ms;
1157 client_config.caFile = ca_file.c_str();
1158 client_config.caPath = ca_path.c_str();
1159 client_config.verifySSL = verify_ssl;
1160
1161 client_config.retryStrategy = Aws::MakeShared<S3RetryStrategy>(
1162 constants::s3_allocation_tag.c_str(),
1163 stats_,
1164 connect_max_tries,
1165 connect_scale_factor);
1166
1167 #ifdef __linux__
1168 // If the user has not set a s3 ca file or ca path then let's attempt to set
1169 // the cert file if we've autodetected it
1170 if (ca_file.empty() && ca_path.empty()) {
1171 const std::string cert_file =
1172 global_state::GlobalState::GetGlobalState().cert_file();
1173 if (!cert_file.empty()) {
1174 client_config.caFile = cert_file.c_str();
1175 }
1176 }
1177 #endif
1178
1179 switch ((!aws_access_key_id.empty() ? 1 : 0) +
1180 (!aws_secret_access_key.empty() ? 2 : 0) +
1181 (!aws_role_arn.empty() ? 4 : 0)) {
1182 case 0:
1183 break;
1184 case 1:
1185 case 2:
1186 return Status::S3Error(
1187 "Insufficient authentication credentials; "
1188 "Both access key id and secret key are needed");
1189 case 3: {
1190 Aws::String access_key_id(aws_access_key_id.c_str());
1191 Aws::String secret_access_key(aws_secret_access_key.c_str());
1192 Aws::String session_token(
1193 !aws_session_token.empty() ? aws_session_token.c_str() : "");
1194 credentials_provider_ = tdb_make_shared(
1195 Aws::Auth::SimpleAWSCredentialsProvider,
1196 access_key_id,
1197 secret_access_key,
1198 session_token);
1199 break;
1200 }
1201 case 4: {
1202 // If AWS Role ARN provided instead of access_key and secret_key,
1203 // temporary credentials will be fetched by assuming this role.
1204 Aws::String role_arn(aws_role_arn.c_str());
1205 Aws::String external_id(
1206 !aws_external_id.empty() ? aws_external_id.c_str() : "");
1207 int load_frequency(
1208 !aws_load_frequency.empty() ?
1209 std::stoi(aws_load_frequency) :
1210 Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS);
1211 Aws::String session_name(
1212 !aws_session_name.empty() ? aws_session_name.c_str() : "");
1213 credentials_provider_ = tdb_make_shared(
1214 Aws::Auth::STSAssumeRoleCredentialsProvider,
1215 role_arn,
1216 session_name,
1217 external_id,
1218 load_frequency,
1219 nullptr);
1220 break;
1221 }
1222 default:
1223 return Status::S3Error(
1224 "Ambiguous authentication credentials; both permanent and temporary "
1225 "authentication credentials are configured");
1226 }
1227
1228 // The `Aws::S3::S3Client` constructor is not thread-safe. Although we
1229 // currently hold `client_init_mtx_` that protects this routine from threads
1230 // on this instance of `S3`, it is not sufficient protection from threads on
1231 // another instance of `S3`. Use an additional, static mutex for this
1232 // scenario.
1233 static std::mutex static_client_init_mtx;
1234 {
1235 std::lock_guard<std::mutex> static_lck(static_client_init_mtx);
1236
1237 if (credentials_provider_ == nullptr) {
1238 client_ = tdb_make_shared(
1239 Aws::S3::S3Client,
1240 *client_config_,
1241 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
1242 use_virtual_addressing_);
1243 } else {
1244 client_ = tdb_make_shared(
1245 Aws::S3::S3Client,
1246 credentials_provider_.inner_sp(),
1247 *client_config_,
1248 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
1249 use_virtual_addressing_);
1250 }
1251 }
1252
1253 return Status::Ok();
1254 }
1255
copy_object(const URI & old_uri,const URI & new_uri)1256 Status S3::copy_object(const URI& old_uri, const URI& new_uri) {
1257 RETURN_NOT_OK(init_client());
1258
1259 Aws::Http::URI src_uri = old_uri.c_str();
1260 Aws::Http::URI dst_uri = new_uri.c_str();
1261 Aws::S3::Model::CopyObjectRequest copy_object_request;
1262 copy_object_request.SetCopySource(
1263 join_authority_and_path(
1264 src_uri.GetAuthority().c_str(), src_uri.GetPath().c_str())
1265 .c_str());
1266 copy_object_request.SetBucket(dst_uri.GetAuthority());
1267 copy_object_request.SetKey(dst_uri.GetPath());
1268 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1269 copy_object_request.SetRequestPayer(request_payer_);
1270 if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
1271 copy_object_request.SetServerSideEncryption(sse_);
1272 if (!sse_kms_key_id_.empty())
1273 copy_object_request.SetSSEKMSKeyId(Aws::String(sse_kms_key_id_.c_str()));
1274 if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
1275 copy_object_request.SetACL(object_canned_acl_);
1276 }
1277
1278 auto copy_object_outcome = client_->CopyObject(copy_object_request);
1279 if (!copy_object_outcome.IsSuccess()) {
1280 return LOG_STATUS(Status::S3Error(
1281 std::string("Failed to copy S3 object ") + old_uri.c_str() + " to " +
1282 new_uri.c_str() + outcome_error_message(copy_object_outcome)));
1283 }
1284
1285 wait_for_object_to_propagate(
1286 copy_object_request.GetBucket(), copy_object_request.GetKey());
1287
1288 return Status::Ok();
1289 }
1290
fill_file_buffer(Buffer * buff,const void * buffer,uint64_t length,uint64_t * nbytes_filled)1291 Status S3::fill_file_buffer(
1292 Buffer* buff,
1293 const void* buffer,
1294 uint64_t length,
1295 uint64_t* nbytes_filled) {
1296 *nbytes_filled = std::min(file_buffer_size_ - buff->size(), length);
1297 if (*nbytes_filled > 0)
1298 RETURN_NOT_OK(buff->write(buffer, *nbytes_filled));
1299
1300 return Status::Ok();
1301 }
1302
add_front_slash(const std::string & path) const1303 std::string S3::add_front_slash(const std::string& path) const {
1304 return (path.front() != '/') ? (std::string("/") + path) : path;
1305 }
1306
remove_front_slash(const std::string & path) const1307 std::string S3::remove_front_slash(const std::string& path) const {
1308 if (path.front() == '/')
1309 return path.substr(1, path.length());
1310 return path;
1311 }
1312
remove_trailing_slash(const std::string & path) const1313 std::string S3::remove_trailing_slash(const std::string& path) const {
1314 if (path.back() == '/') {
1315 return path.substr(0, path.length() - 1);
1316 }
1317
1318 return path;
1319 }
1320
flush_file_buffer(const URI & uri,Buffer * buff,bool last_part)1321 Status S3::flush_file_buffer(const URI& uri, Buffer* buff, bool last_part) {
1322 RETURN_NOT_OK(init_client());
1323 if (buff->size() > 0) {
1324 const Status st =
1325 write_multipart(uri, buff->data(), buff->size(), last_part);
1326 buff->reset_size();
1327 RETURN_NOT_OK(st);
1328 }
1329
1330 return Status::Ok();
1331 }
1332
get_file_buffer(const URI & uri,Buffer ** buff)1333 Status S3::get_file_buffer(const URI& uri, Buffer** buff) {
1334 // TODO: remove this?
1335 std::unique_lock<std::mutex> lck(file_buffers_mtx_);
1336
1337 auto uri_str = uri.to_string();
1338 auto it = file_buffers_.find(uri_str);
1339 if (it == file_buffers_.end()) {
1340 auto new_buff = tdb_new(Buffer);
1341 file_buffers_[uri_str] = new_buff;
1342 *buff = new_buff;
1343 } else {
1344 *buff = it->second;
1345 }
1346
1347 return Status::Ok();
1348 }
1349
initiate_multipart_request(Aws::Http::URI aws_uri,MultiPartUploadState * state)1350 Status S3::initiate_multipart_request(
1351 Aws::Http::URI aws_uri, MultiPartUploadState* state) {
1352 RETURN_NOT_OK(init_client());
1353
1354 auto path = aws_uri.GetPath();
1355 std::string path_c_str = path.c_str();
1356 Aws::S3::Model::CreateMultipartUploadRequest multipart_upload_request;
1357 multipart_upload_request.SetBucket(aws_uri.GetAuthority());
1358 multipart_upload_request.SetKey(path);
1359 multipart_upload_request.SetContentType("application/octet-stream");
1360 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1361 multipart_upload_request.SetRequestPayer(request_payer_);
1362 if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
1363 multipart_upload_request.SetServerSideEncryption(sse_);
1364 if (!sse_kms_key_id_.empty())
1365 multipart_upload_request.SetSSEKMSKeyId(
1366 Aws::String(sse_kms_key_id_.c_str()));
1367 if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
1368 multipart_upload_request.SetACL(object_canned_acl_);
1369 }
1370
1371 auto multipart_upload_outcome =
1372 client_->CreateMultipartUpload(multipart_upload_request);
1373 if (!multipart_upload_outcome.IsSuccess()) {
1374 return LOG_STATUS(Status::S3Error(
1375 std::string("Failed to create multipart request for object '") +
1376 path_c_str + outcome_error_message(multipart_upload_outcome)));
1377 }
1378
1379 state->part_number = 1;
1380 state->bucket = aws_uri.GetAuthority();
1381 state->key = path;
1382 state->upload_id = multipart_upload_outcome.GetResult().GetUploadId();
1383 state->completed_parts = std::map<int, Aws::S3::Model::CompletedPart>();
1384
1385 *state = MultiPartUploadState(
1386 1,
1387 Aws::String(aws_uri.GetAuthority()),
1388 Aws::String(path),
1389 Aws::String(multipart_upload_outcome.GetResult().GetUploadId()),
1390 std::map<int, Aws::S3::Model::CompletedPart>());
1391
1392 return Status::Ok();
1393 }
1394
join_authority_and_path(const std::string & authority,const std::string & path) const1395 std::string S3::join_authority_and_path(
1396 const std::string& authority, const std::string& path) const {
1397 bool path_has_slash = !path.empty() && path.front() == '/';
1398 bool authority_has_slash = !authority.empty() && authority.back() == '/';
1399 bool need_slash = !(path_has_slash || authority_has_slash);
1400 return authority + (need_slash ? "/" : "") + path;
1401 }
1402
wait_for_object_to_propagate(const Aws::String & bucket_name,const Aws::String & object_key) const1403 Status S3::wait_for_object_to_propagate(
1404 const Aws::String& bucket_name, const Aws::String& object_key) const {
1405 init_client();
1406
1407 unsigned attempts_cnt = 0;
1408 while (attempts_cnt++ < constants::s3_max_attempts) {
1409 bool exists;
1410 RETURN_NOT_OK(is_object(bucket_name, object_key, &exists));
1411 if (exists) {
1412 return Status::Ok();
1413 }
1414
1415 std::this_thread::sleep_for(
1416 std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1417 }
1418
1419 return LOG_STATUS(Status::S3Error(
1420 "Failed waiting for object " +
1421 std::string(object_key.c_str(), object_key.size()) + " to be created."));
1422 }
1423
wait_for_object_to_be_deleted(const Aws::String & bucket_name,const Aws::String & object_key) const1424 Status S3::wait_for_object_to_be_deleted(
1425 const Aws::String& bucket_name, const Aws::String& object_key) const {
1426 init_client();
1427
1428 unsigned attempts_cnt = 0;
1429 while (attempts_cnt++ < constants::s3_max_attempts) {
1430 bool exists;
1431 RETURN_NOT_OK(is_object(bucket_name, object_key, &exists));
1432 if (!exists) {
1433 return Status::Ok();
1434 }
1435
1436 std::this_thread::sleep_for(
1437 std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1438 }
1439
1440 return LOG_STATUS(Status::S3Error(
1441 "Failed waiting for object " +
1442 std::string(object_key.c_str(), object_key.size()) + " to be deleted."));
1443 }
1444
wait_for_bucket_to_be_created(const URI & bucket_uri) const1445 Status S3::wait_for_bucket_to_be_created(const URI& bucket_uri) const {
1446 init_client();
1447
1448 unsigned attempts_cnt = 0;
1449 while (attempts_cnt++ < constants::s3_max_attempts) {
1450 bool exists;
1451 RETURN_NOT_OK(is_bucket(bucket_uri, &exists));
1452 if (exists) {
1453 return Status::Ok();
1454 }
1455
1456 std::this_thread::sleep_for(
1457 std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1458 }
1459
1460 return LOG_STATUS(Status::S3Error(
1461 "Failed waiting for bucket " + bucket_uri.to_string() +
1462 " to be created."));
1463 }
1464
flush_direct(const URI & uri)1465 Status S3::flush_direct(const URI& uri) {
1466 RETURN_NOT_OK(init_client());
1467
1468 // Get file buffer
1469 auto buff = (Buffer*)nullptr;
1470 RETURN_NOT_OK(get_file_buffer(uri, &buff));
1471
1472 const Aws::Http::URI aws_uri(uri.c_str());
1473 const std::string uri_path(aws_uri.GetPath().c_str());
1474
1475 Aws::S3::Model::PutObjectRequest put_object_request;
1476
1477 auto stream = std::shared_ptr<Aws::IOStream>(
1478 new boost::interprocess::bufferstream((char*)buff->data(), buff->size()));
1479
1480 put_object_request.SetBody(stream);
1481 put_object_request.SetContentLength(buff->size());
1482
1483 // we only want to hash once, and must do it after setting the body
1484 auto md5_hash =
1485 Aws::Utils::HashingUtils::CalculateMD5(*put_object_request.GetBody());
1486
1487 put_object_request.SetContentMD5(
1488 Aws::Utils::HashingUtils::Base64Encode(md5_hash));
1489 put_object_request.SetContentType("application/octet-stream");
1490 put_object_request.SetBucket(aws_uri.GetAuthority());
1491 put_object_request.SetKey(aws_uri.GetPath());
1492 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1493 put_object_request.SetRequestPayer(request_payer_);
1494 if (sse_ != Aws::S3::Model::ServerSideEncryption::NOT_SET)
1495 put_object_request.SetServerSideEncryption(sse_);
1496 if (!sse_kms_key_id_.empty())
1497 put_object_request.SetSSEKMSKeyId(Aws::String(sse_kms_key_id_.c_str()));
1498 if (object_canned_acl_ != Aws::S3::Model::ObjectCannedACL::NOT_SET) {
1499 put_object_request.SetACL(object_canned_acl_);
1500 }
1501
1502 auto put_object_outcome = client_->PutObject(put_object_request);
1503 if (!put_object_outcome.IsSuccess()) {
1504 return LOG_STATUS(Status::S3Error(
1505 std::string("Cannot write object '") + uri.c_str() +
1506 outcome_error_message(put_object_outcome)));
1507 }
1508
1509 // verify the MD5 hash of the result
1510 // note the etag is hex-encoded not base64
1511 Aws::StringStream md5_hex;
1512 md5_hex << "\"" << Aws::Utils::HashingUtils::HexEncode(md5_hash) << "\"";
1513 if (md5_hex.str() != put_object_outcome.GetResult().GetETag()) {
1514 return LOG_STATUS(
1515 Status::S3Error("Object uploaded successfully, but MD5 hash does not "
1516 "match result from server!' "));
1517 }
1518
1519 wait_for_object_to_propagate(
1520 put_object_request.GetBucket(), put_object_request.GetKey());
1521
1522 return Status::Ok();
1523 }
1524
write_multipart(const URI & uri,const void * buffer,uint64_t length,bool last_part)1525 Status S3::write_multipart(
1526 const URI& uri, const void* buffer, uint64_t length, bool last_part) {
1527 RETURN_NOT_OK(init_client());
1528
1529 // Ensure that each thread is responsible for exactly multipart_part_size_
1530 // bytes (except if this is the last write_multipart, in which case the final
1531 // thread should write less), and cap the number of parallel operations at the
1532 // configured max number. Length must be evenly divisible by
1533 // multipart_part_size_ unless this is the last part.
1534 uint64_t num_ops = last_part ?
1535 utils::math::ceil(length, multipart_part_size_) :
1536 (length / multipart_part_size_);
1537 num_ops = std::min(std::max(num_ops, uint64_t(1)), max_parallel_ops_);
1538
1539 if (!last_part && length % multipart_part_size_ != 0) {
1540 return LOG_STATUS(
1541 Status::S3Error("Length not evenly divisible by part length"));
1542 }
1543
1544 const Aws::Http::URI aws_uri(uri.c_str());
1545 const std::string uri_path(aws_uri.GetPath().c_str());
1546
1547 MultiPartUploadState* state;
1548 std::unique_lock<std::mutex> state_lck;
1549
1550 // Take a lock protecting the shared multipart data structures
1551 // Read lock to see if it exists
1552 UniqueReadLock unique_rl(&multipart_upload_rwlock_);
1553
1554 auto state_iter = multipart_upload_states_.find(uri_path);
1555 if (state_iter == multipart_upload_states_.end()) {
1556 // If the state is new, we must grab write lock, so unlock from read and
1557 // grab write
1558 unique_rl.unlock();
1559 UniqueWriteLock unique_wl(&multipart_upload_rwlock_);
1560
1561 // Since we switched locks we need to once again check to make sure another
1562 // thread didn't create the state
1563 state_iter = multipart_upload_states_.find(uri_path);
1564 if (state_iter == multipart_upload_states_.end()) {
1565 auto path = aws_uri.GetPath();
1566 std::string path_str = path.c_str();
1567 MultiPartUploadState new_state;
1568
1569 assert(multipart_upload_states_.count(path_str) == 0);
1570 multipart_upload_states_.emplace(
1571 std::move(path_str), std::move(new_state));
1572 state = &multipart_upload_states_.at(uri_path);
1573 state_lck = std::unique_lock<std::mutex>(state->mtx);
1574 // Downgrade to read lock, expected below outside the create
1575 unique_wl.unlock();
1576
1577 // Delete file if it exists (overwrite) and initiate multipart request
1578 bool exists;
1579 RETURN_NOT_OK(is_object(uri, &exists));
1580 if (exists) {
1581 RETURN_NOT_OK(remove_object(uri));
1582 }
1583
1584 const Status st = initiate_multipart_request(aws_uri, state);
1585 if (!st.ok()) {
1586 return st;
1587 }
1588 } else {
1589 // If another thread switched state, switch back to a read lock
1590 state = &multipart_upload_states_.at(uri_path);
1591
1592 // Lock multipart state
1593 state_lck = std::unique_lock<std::mutex>(state->mtx);
1594 }
1595 } else {
1596 state = &multipart_upload_states_.at(uri_path);
1597
1598 // Lock multipart state
1599 state_lck = std::unique_lock<std::mutex>(state->mtx);
1600
1601 // Unlock, as make_upload_part_req will reaquire as necessary.
1602 unique_rl.unlock();
1603 }
1604
1605 // Get the upload ID
1606 const auto upload_id = state->upload_id;
1607
1608 // Assign the part number(s), and make the write request.
1609 if (num_ops == 1) {
1610 const int part_num = state->part_number++;
1611 state_lck.unlock();
1612
1613 auto ctx =
1614 make_upload_part_req(aws_uri, buffer, length, upload_id, part_num);
1615 return get_make_upload_part_req(uri, uri_path, ctx);
1616 } else {
1617 std::vector<MakeUploadPartCtx> ctx_vec;
1618 ctx_vec.reserve(num_ops);
1619 const uint64_t bytes_per_op = multipart_part_size_;
1620 const int part_num_base = state->part_number;
1621 for (uint64_t i = 0; i < num_ops; i++) {
1622 uint64_t begin = i * bytes_per_op,
1623 end = std::min((i + 1) * bytes_per_op - 1, length - 1);
1624 uint64_t thread_nbytes = end - begin + 1;
1625 auto thread_buffer = reinterpret_cast<const char*>(buffer) + begin;
1626 int part_num = static_cast<int>(part_num_base + i);
1627 auto ctx = make_upload_part_req(
1628 aws_uri, thread_buffer, thread_nbytes, upload_id, part_num);
1629 ctx_vec.emplace_back(std::move(ctx));
1630 }
1631 state->part_number += num_ops;
1632 state_lck.unlock();
1633
1634 Status aggregate_st = Status::Ok();
1635 for (auto& ctx : ctx_vec) {
1636 const Status st = get_make_upload_part_req(uri, uri_path, ctx);
1637 if (!st.ok()) {
1638 aggregate_st = st;
1639 }
1640 }
1641
1642 if (!aggregate_st.ok()) {
1643 std::stringstream errmsg;
1644 errmsg << "S3 parallel write multipart error; " << aggregate_st.message();
1645 LOG_STATUS(Status::S3Error(errmsg.str()));
1646 }
1647 return aggregate_st;
1648 }
1649 }
1650
make_upload_part_req(const Aws::Http::URI & aws_uri,const void * const buffer,const uint64_t length,const Aws::String & upload_id,const int upload_part_num)1651 S3::MakeUploadPartCtx S3::make_upload_part_req(
1652 const Aws::Http::URI& aws_uri,
1653 const void* const buffer,
1654 const uint64_t length,
1655 const Aws::String& upload_id,
1656 const int upload_part_num) {
1657 auto stream = std::shared_ptr<Aws::IOStream>(
1658 new boost::interprocess::bufferstream((char*)buffer, length));
1659
1660 Aws::S3::Model::UploadPartRequest upload_part_request;
1661 upload_part_request.SetBucket(aws_uri.GetAuthority());
1662 upload_part_request.SetKey(aws_uri.GetPath());
1663 upload_part_request.SetPartNumber(upload_part_num);
1664 upload_part_request.SetUploadId(upload_id);
1665 upload_part_request.SetBody(stream);
1666 upload_part_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(
1667 Aws::Utils::HashingUtils::CalculateMD5(*stream)));
1668 upload_part_request.SetContentLength(length);
1669 if (request_payer_ != Aws::S3::Model::RequestPayer::NOT_SET)
1670 upload_part_request.SetRequestPayer(request_payer_);
1671
1672 auto upload_part_outcome_callable =
1673 client_->UploadPartCallable(upload_part_request);
1674
1675 MakeUploadPartCtx ctx(
1676 std::move(upload_part_outcome_callable), upload_part_num);
1677 return ctx;
1678 }
1679
get_make_upload_part_req(const URI & uri,const std::string & uri_path,MakeUploadPartCtx & ctx)1680 Status S3::get_make_upload_part_req(
1681 const URI& uri, const std::string& uri_path, MakeUploadPartCtx& ctx) {
1682 RETURN_NOT_OK(init_client());
1683
1684 auto upload_part_outcome = ctx.upload_part_outcome_callable.get();
1685 bool success = upload_part_outcome.IsSuccess();
1686
1687 static const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
1688 if (unit_test_cfg.s3_fail_every_nth_upload_request.is_set() &&
1689 ctx.upload_part_num %
1690 unit_test_cfg.s3_fail_every_nth_upload_request.get() ==
1691 0) {
1692 success = false;
1693 }
1694
1695 if (!success) {
1696 UniqueReadLock unique_rl(&multipart_upload_rwlock_);
1697 auto state = &multipart_upload_states_.at(uri_path);
1698 Status st = Status::S3Error(
1699 std::string("Failed to upload part of S3 object '") + uri.c_str() +
1700 outcome_error_message(upload_part_outcome));
1701 // Lock multipart state
1702 std::unique_lock<std::mutex> state_lck(state->mtx);
1703 unique_rl.unlock();
1704 state->st = st;
1705 return LOG_STATUS(st);
1706 }
1707
1708 Aws::S3::Model::CompletedPart completed_part;
1709 completed_part.SetETag(upload_part_outcome.GetResult().GetETag());
1710 completed_part.SetPartNumber(ctx.upload_part_num);
1711
1712 UniqueReadLock unique_rl(&multipart_upload_rwlock_);
1713 auto state = &multipart_upload_states_.at(uri_path);
1714 // Lock multipart state
1715 std::unique_lock<std::mutex> state_lck(state->mtx);
1716 unique_rl.unlock();
1717 state->completed_parts.emplace(
1718 ctx.upload_part_num, std::move(completed_part));
1719 state_lck.unlock();
1720
1721 return Status::Ok();
1722 }
1723
1724 } // namespace sm
1725 } // namespace tiledb
1726
1727 #endif
1728