1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/filesystem/s3fs.h"
19
20 #include <algorithm>
21 #include <atomic>
22 #include <condition_variable>
23 #include <mutex>
24 #include <sstream>
25 #include <unordered_map>
26 #include <utility>
27
28 #ifdef _WIN32
29 // Undefine preprocessor macros that interfere with AWS function / method names
30 #ifdef GetMessage
31 #undef GetMessage
32 #endif
33 #ifdef GetObject
34 #undef GetObject
35 #endif
36 #endif
37
38 #include <aws/core/Aws.h>
39 #include <aws/core/auth/AWSCredentials.h>
40 #include <aws/core/auth/AWSCredentialsProviderChain.h>
41 #include <aws/core/client/RetryStrategy.h>
42 #include <aws/core/utils/logging/ConsoleLogSystem.h>
43 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
44 #include <aws/s3/S3Client.h>
45 #include <aws/s3/model/AbortMultipartUploadRequest.h>
46 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
47 #include <aws/s3/model/CompletedMultipartUpload.h>
48 #include <aws/s3/model/CompletedPart.h>
49 #include <aws/s3/model/CopyObjectRequest.h>
50 #include <aws/s3/model/CreateBucketRequest.h>
51 #include <aws/s3/model/CreateMultipartUploadRequest.h>
52 #include <aws/s3/model/DeleteBucketRequest.h>
53 #include <aws/s3/model/DeleteObjectRequest.h>
54 #include <aws/s3/model/DeleteObjectsRequest.h>
55 #include <aws/s3/model/GetObjectRequest.h>
56 #include <aws/s3/model/HeadBucketRequest.h>
57 #include <aws/s3/model/HeadObjectRequest.h>
58 #include <aws/s3/model/ListBucketsResult.h>
59 #include <aws/s3/model/ListObjectsV2Request.h>
60 #include <aws/s3/model/PutObjectRequest.h>
61 #include <aws/s3/model/UploadPartRequest.h>
62
63 #include "arrow/buffer.h"
64 #include "arrow/filesystem/filesystem.h"
65 #include "arrow/filesystem/path_util.h"
66 #include "arrow/filesystem/s3_internal.h"
67 #include "arrow/io/interfaces.h"
68 #include "arrow/io/memory.h"
69 #include "arrow/io/util_internal.h"
70 #include "arrow/result.h"
71 #include "arrow/status.h"
72 #include "arrow/util/checked_cast.h"
73 #include "arrow/util/logging.h"
74 #include "arrow/util/windows_fixup.h"
75
76 namespace arrow {
77
78 using internal::Uri;
79
80 namespace fs {
81
82 using ::Aws::Client::AWSError;
83 using ::Aws::S3::S3Errors;
84 namespace S3Model = Aws::S3::Model;
85
86 using ::arrow::fs::internal::ConnectRetryStrategy;
87 using ::arrow::fs::internal::ErrorToStatus;
88 using ::arrow::fs::internal::FromAwsDatetime;
89 using ::arrow::fs::internal::FromAwsString;
90 using ::arrow::fs::internal::IsAlreadyExists;
91 using ::arrow::fs::internal::IsNotFound;
92 using ::arrow::fs::internal::OutcomeToResult;
93 using ::arrow::fs::internal::OutcomeToStatus;
94 using ::arrow::fs::internal::ToAwsString;
95 using ::arrow::fs::internal::ToURLEncodedAwsString;
96
97 const char* kS3DefaultRegion = "us-east-1";
98
99 static const char kSep = '/';
100
101 namespace {
102
103 std::mutex aws_init_lock;
104 Aws::SDKOptions aws_options;
105 std::atomic<bool> aws_initialized(false);
106
DoInitializeS3(const S3GlobalOptions & options)107 Status DoInitializeS3(const S3GlobalOptions& options) {
108 Aws::Utils::Logging::LogLevel aws_log_level;
109
110 #define LOG_LEVEL_CASE(level_name) \
111 case S3LogLevel::level_name: \
112 aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
113 break;
114
115 switch (options.log_level) {
116 LOG_LEVEL_CASE(Fatal)
117 LOG_LEVEL_CASE(Error)
118 LOG_LEVEL_CASE(Warn)
119 LOG_LEVEL_CASE(Info)
120 LOG_LEVEL_CASE(Debug)
121 LOG_LEVEL_CASE(Trace)
122 default:
123 aws_log_level = Aws::Utils::Logging::LogLevel::Off;
124 }
125
126 #undef LOG_LEVEL_CASE
127
128 aws_options.loggingOptions.logLevel = aws_log_level;
129 // By default the AWS SDK logs to files, log to console instead
130 aws_options.loggingOptions.logger_create_fn = [] {
131 return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
132 aws_options.loggingOptions.logLevel);
133 };
134 Aws::InitAPI(aws_options);
135 aws_initialized.store(true);
136 return Status::OK();
137 }
138
139 } // namespace
140
InitializeS3(const S3GlobalOptions & options)141 Status InitializeS3(const S3GlobalOptions& options) {
142 std::lock_guard<std::mutex> lock(aws_init_lock);
143 return DoInitializeS3(options);
144 }
145
FinalizeS3()146 Status FinalizeS3() {
147 std::lock_guard<std::mutex> lock(aws_init_lock);
148 Aws::ShutdownAPI(aws_options);
149 aws_initialized.store(false);
150 return Status::OK();
151 }
152
EnsureS3Initialized()153 Status EnsureS3Initialized() {
154 std::lock_guard<std::mutex> lock(aws_init_lock);
155 if (!aws_initialized.load()) {
156 S3GlobalOptions options{S3LogLevel::Fatal};
157 return DoInitializeS3(options);
158 }
159 return Status::OK();
160 }
161
ConfigureDefaultCredentials()162 void S3Options::ConfigureDefaultCredentials() {
163 credentials_provider =
164 std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
165 }
166
ConfigureAccessKey(const std::string & access_key,const std::string & secret_key)167 void S3Options::ConfigureAccessKey(const std::string& access_key,
168 const std::string& secret_key) {
169 credentials_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
170 ToAwsString(access_key), ToAwsString(secret_key));
171 }
172
GetAccessKey() const173 std::string S3Options::GetAccessKey() const {
174 auto credentials = credentials_provider->GetAWSCredentials();
175 return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
176 }
177
GetSecretKey() const178 std::string S3Options::GetSecretKey() const {
179 auto credentials = credentials_provider->GetAWSCredentials();
180 return std::string(FromAwsString(credentials.GetAWSSecretKey()));
181 }
182
Defaults()183 S3Options S3Options::Defaults() {
184 S3Options options;
185 options.ConfigureDefaultCredentials();
186 return options;
187 }
188
FromAccessKey(const std::string & access_key,const std::string & secret_key)189 S3Options S3Options::FromAccessKey(const std::string& access_key,
190 const std::string& secret_key) {
191 S3Options options;
192 options.ConfigureAccessKey(access_key, secret_key);
193 return options;
194 }
195
FromUri(const Uri & uri,std::string * out_path)196 Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
197 S3Options options;
198
199 const auto bucket = uri.host();
200 auto path = uri.path();
201 if (bucket.empty()) {
202 if (!path.empty()) {
203 return Status::Invalid("Missing bucket name in S3 URI");
204 }
205 } else {
206 if (path.empty()) {
207 path = bucket;
208 } else {
209 if (path[0] != '/') {
210 return Status::Invalid("S3 URI should absolute, not relative");
211 }
212 path = bucket + path;
213 }
214 }
215 if (out_path != nullptr) {
216 *out_path = std::string(internal::RemoveTrailingSlash(path));
217 }
218
219 std::unordered_map<std::string, std::string> options_map;
220 ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
221 for (const auto& kv : options_items) {
222 options_map.emplace(kv.first, kv.second);
223 }
224
225 const auto username = uri.username();
226 if (!username.empty()) {
227 options.ConfigureAccessKey(username, uri.password());
228 } else {
229 options.ConfigureDefaultCredentials();
230 }
231
232 auto it = options_map.find("region");
233 if (it != options_map.end()) {
234 options.region = it->second;
235 }
236 it = options_map.find("scheme");
237 if (it != options_map.end()) {
238 options.scheme = it->second;
239 }
240 it = options_map.find("endpoint_override");
241 if (it != options_map.end()) {
242 options.endpoint_override = it->second;
243 }
244
245 return options;
246 }
247
FromUri(const std::string & uri_string,std::string * out_path)248 Result<S3Options> S3Options::FromUri(const std::string& uri_string,
249 std::string* out_path) {
250 Uri uri;
251 RETURN_NOT_OK(uri.Parse(uri_string));
252 return FromUri(uri, out_path);
253 }
254
Equals(const S3Options & other) const255 bool S3Options::Equals(const S3Options& other) const {
256 return (region == other.region && endpoint_override == other.endpoint_override &&
257 scheme == other.scheme && background_writes == other.background_writes &&
258 GetAccessKey() == other.GetAccessKey() &&
259 GetSecretKey() == other.GetSecretKey());
260 }
261
262 namespace {
263
CheckS3Initialized()264 Status CheckS3Initialized() {
265 if (!aws_initialized.load()) {
266 return Status::Invalid(
267 "S3 subsystem not initialized; please call InitializeS3() "
268 "before carrying out any S3-related operation");
269 }
270 return Status::OK();
271 }
272
273 // XXX Sanitize paths by removing leading slash?
274
275 struct S3Path {
276 std::string full_path;
277 std::string bucket;
278 std::string key;
279 std::vector<std::string> key_parts;
280
FromStringarrow::fs::__anona7f17f1c0311::S3Path281 static Status FromString(const std::string& s, S3Path* out) {
282 const auto src = internal::RemoveTrailingSlash(s);
283 auto first_sep = src.find_first_of(kSep);
284 if (first_sep == 0) {
285 return Status::Invalid("Path cannot start with a separator ('", s, "')");
286 }
287 if (first_sep == std::string::npos) {
288 *out = {std::string(src), std::string(src), "", {}};
289 return Status::OK();
290 }
291 out->full_path = std::string(src);
292 out->bucket = std::string(src.substr(0, first_sep));
293 out->key = std::string(src.substr(first_sep + 1));
294 out->key_parts = internal::SplitAbstractPath(out->key);
295 return Validate(out);
296 }
297
Validatearrow::fs::__anona7f17f1c0311::S3Path298 static Status Validate(S3Path* path) {
299 auto result = internal::ValidateAbstractPathParts(path->key_parts);
300 if (!result.ok()) {
301 return Status::Invalid(result.message(), " in path ", path->full_path);
302 } else {
303 return result;
304 }
305 }
306
ToURLEncodedAwsStringarrow::fs::__anona7f17f1c0311::S3Path307 Aws::String ToURLEncodedAwsString() const {
308 // URL-encode individual parts, not the '/' separator
309 Aws::String res;
310 res += internal::ToURLEncodedAwsString(bucket);
311 for (const auto& part : key_parts) {
312 res += kSep;
313 res += internal::ToURLEncodedAwsString(part);
314 }
315 return res;
316 }
317
parentarrow::fs::__anona7f17f1c0311::S3Path318 S3Path parent() const {
319 DCHECK(!key_parts.empty());
320 auto parent = S3Path{"", bucket, "", key_parts};
321 parent.key_parts.pop_back();
322 parent.key = internal::JoinAbstractPath(parent.key_parts);
323 parent.full_path = parent.bucket + kSep + parent.key;
324 return parent;
325 }
326
has_parentarrow::fs::__anona7f17f1c0311::S3Path327 bool has_parent() const { return !key.empty(); }
328
emptyarrow::fs::__anona7f17f1c0311::S3Path329 bool empty() const { return bucket.empty() && key.empty(); }
330
operator ==arrow::fs::__anona7f17f1c0311::S3Path331 bool operator==(const S3Path& other) const {
332 return bucket == other.bucket && key == other.key;
333 }
334 };
335
336 // XXX return in OutcomeToStatus instead?
PathNotFound(const S3Path & path)337 Status PathNotFound(const S3Path& path) {
338 return Status::IOError("Path does not exist '", path.full_path, "'");
339 }
340
PathNotFound(const std::string & bucket,const std::string & key)341 Status PathNotFound(const std::string& bucket, const std::string& key) {
342 return Status::IOError("Path does not exist '", bucket, kSep, key, "'");
343 }
344
NotAFile(const S3Path & path)345 Status NotAFile(const S3Path& path) {
346 return Status::IOError("Not a regular file: '", path.full_path, "'");
347 }
348
ValidateFilePath(const S3Path & path)349 Status ValidateFilePath(const S3Path& path) {
350 if (path.bucket.empty() || path.key.empty()) {
351 return NotAFile(path);
352 }
353 return Status::OK();
354 }
355
FormatRange(int64_t start,int64_t length)356 std::string FormatRange(int64_t start, int64_t length) {
357 // Format a HTTP range header value
358 std::stringstream ss;
359 ss << "bytes=" << start << "-" << start + length - 1;
360 return ss.str();
361 }
362
363 // A non-copying iostream.
364 // See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
365 // https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
366 class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
367 public:
StringViewStream(const void * data,int64_t nbytes)368 StringViewStream(const void* data, int64_t nbytes)
369 : Aws::Utils::Stream::PreallocatedStreamBuf(
370 reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
371 static_cast<size_t>(nbytes)),
372 std::iostream(this) {}
373 };
374
375 // By default, the AWS SDK reads object data into an auto-growing StringStream.
376 // To avoid copies, read directly into our preallocated buffer instead.
377 // See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
378 // functionally similar recipe.
AwsWriteableStreamFactory(void * data,int64_t nbytes)379 Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
380 return [=]() { return new StringViewStream(data, nbytes); };
381 }
382
GetObjectRange(Aws::S3::S3Client * client,const S3Path & path,int64_t start,int64_t length,void * out)383 Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
384 const S3Path& path, int64_t start,
385 int64_t length, void* out) {
386 S3Model::GetObjectRequest req;
387 req.SetBucket(ToAwsString(path.bucket));
388 req.SetKey(ToAwsString(path.key));
389 req.SetRange(ToAwsString(FormatRange(start, length)));
390 req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
391 return OutcomeToResult(client->GetObject(req));
392 }
393
394 // A RandomAccessFile that reads from a S3 object
395 class ObjectInputFile : public io::RandomAccessFile {
396 public:
ObjectInputFile(Aws::S3::S3Client * client,const S3Path & path)397 ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path)
398 : client_(client), path_(path) {}
399
Init()400 Status Init() {
401 // Issue a HEAD Object to get the content-length and ensure any
402 // errors (e.g. file not found) don't wait until the first Read() call.
403 S3Model::HeadObjectRequest req;
404 req.SetBucket(ToAwsString(path_.bucket));
405 req.SetKey(ToAwsString(path_.key));
406
407 auto outcome = client_->HeadObject(req);
408 if (!outcome.IsSuccess()) {
409 if (IsNotFound(outcome.GetError())) {
410 return PathNotFound(path_);
411 } else {
412 return ErrorToStatus(
413 std::forward_as_tuple("When reading information for key '", path_.key,
414 "' in bucket '", path_.bucket, "': "),
415 outcome.GetError());
416 }
417 }
418 content_length_ = outcome.GetResult().GetContentLength();
419 DCHECK_GE(content_length_, 0);
420 return Status::OK();
421 }
422
CheckClosed() const423 Status CheckClosed() const {
424 if (closed_) {
425 return Status::Invalid("Operation on closed stream");
426 }
427 return Status::OK();
428 }
429
CheckPosition(int64_t position,const char * action) const430 Status CheckPosition(int64_t position, const char* action) const {
431 if (position < 0) {
432 return Status::Invalid("Cannot ", action, " from negative position");
433 }
434 if (position > content_length_) {
435 return Status::IOError("Cannot ", action, " past end of file");
436 }
437 return Status::OK();
438 }
439
440 // RandomAccessFile APIs
441
Close()442 Status Close() override {
443 closed_ = true;
444 return Status::OK();
445 }
446
closed() const447 bool closed() const override { return closed_; }
448
Tell() const449 Result<int64_t> Tell() const override {
450 RETURN_NOT_OK(CheckClosed());
451 return pos_;
452 }
453
GetSize()454 Result<int64_t> GetSize() override {
455 RETURN_NOT_OK(CheckClosed());
456 return content_length_;
457 }
458
Seek(int64_t position)459 Status Seek(int64_t position) override {
460 RETURN_NOT_OK(CheckClosed());
461 RETURN_NOT_OK(CheckPosition(position, "seek"));
462
463 pos_ = position;
464 return Status::OK();
465 }
466
ReadAt(int64_t position,int64_t nbytes,void * out)467 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
468 RETURN_NOT_OK(CheckClosed());
469 RETURN_NOT_OK(CheckPosition(position, "read"));
470
471 nbytes = std::min(nbytes, content_length_ - position);
472 if (nbytes == 0) {
473 return 0;
474 }
475
476 // Read the desired range of bytes
477 ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
478 GetObjectRange(client_, path_, position, nbytes, out));
479
480 auto& stream = result.GetBody();
481 stream.ignore(nbytes);
482 // NOTE: the stream is a stringstream by default, there is no actual error
483 // to check for. However, stream.fail() may return true if EOF is reached.
484 return stream.gcount();
485 }
486
ReadAt(int64_t position,int64_t nbytes)487 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
488 RETURN_NOT_OK(CheckClosed());
489 RETURN_NOT_OK(CheckPosition(position, "read"));
490
491 // No need to allocate more than the remaining number of bytes
492 nbytes = std::min(nbytes, content_length_ - position);
493
494 ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes));
495 if (nbytes > 0) {
496 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
497 ReadAt(position, nbytes, buf->mutable_data()));
498 DCHECK_LE(bytes_read, nbytes);
499 RETURN_NOT_OK(buf->Resize(bytes_read));
500 }
501 return std::move(buf);
502 }
503
Read(int64_t nbytes,void * out)504 Result<int64_t> Read(int64_t nbytes, void* out) override {
505 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
506 pos_ += bytes_read;
507 return bytes_read;
508 }
509
Read(int64_t nbytes)510 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
511 ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
512 pos_ += buffer->size();
513 return std::move(buffer);
514 }
515
516 protected:
517 Aws::S3::S3Client* client_;
518 S3Path path_;
519 bool closed_ = false;
520 int64_t pos_ = 0;
521 int64_t content_length_ = -1;
522 };
523
524 // Minimum size for each part of a multipart upload, except for the last part.
525 // AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
526 // so I chose the safer value.
527 // (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
528 static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
529
530 // An OutputStream that writes to a S3 object
531 class ObjectOutputStream : public io::OutputStream {
532 protected:
533 struct UploadState;
534
535 public:
ObjectOutputStream(Aws::S3::S3Client * client,const S3Path & path,const S3Options & options)536 ObjectOutputStream(Aws::S3::S3Client* client, const S3Path& path,
537 const S3Options& options)
538 : client_(client), path_(path), options_(options) {}
539
~ObjectOutputStream()540 ~ObjectOutputStream() override {
541 // For compliance with the rest of the IO stack, Close rather than Abort,
542 // even though it may be more expensive.
543 io::internal::CloseFromDestructor(this);
544 }
545
Init()546 Status Init() {
547 // Initiate the multi-part upload
548 S3Model::CreateMultipartUploadRequest req;
549 req.SetBucket(ToAwsString(path_.bucket));
550 req.SetKey(ToAwsString(path_.key));
551
552 auto outcome = client_->CreateMultipartUpload(req);
553 if (!outcome.IsSuccess()) {
554 return ErrorToStatus(
555 std::forward_as_tuple("When initiating multiple part upload for key '",
556 path_.key, "' in bucket '", path_.bucket, "': "),
557 outcome.GetError());
558 }
559 upload_id_ = outcome.GetResult().GetUploadId();
560 upload_state_ = std::make_shared<UploadState>();
561 closed_ = false;
562 return Status::OK();
563 }
564
Abort()565 Status Abort() override {
566 if (closed_) {
567 return Status::OK();
568 }
569
570 S3Model::AbortMultipartUploadRequest req;
571 req.SetBucket(ToAwsString(path_.bucket));
572 req.SetKey(ToAwsString(path_.key));
573 req.SetUploadId(upload_id_);
574
575 auto outcome = client_->AbortMultipartUpload(req);
576 if (!outcome.IsSuccess()) {
577 return ErrorToStatus(
578 std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
579 "' in bucket '", path_.bucket, "': "),
580 outcome.GetError());
581 }
582 current_part_.reset();
583 closed_ = true;
584 return Status::OK();
585 }
586
587 // OutputStream interface
588
Close()589 Status Close() override {
590 if (closed_) {
591 return Status::OK();
592 }
593
594 if (current_part_) {
595 // Upload last part
596 RETURN_NOT_OK(CommitCurrentPart());
597 }
598
599 // S3 mandates at least one part, upload an empty one if necessary
600 if (part_number_ == 1) {
601 RETURN_NOT_OK(UploadPart("", 0));
602 }
603
604 // Wait for in-progress uploads to finish (if async writes are enabled)
605 RETURN_NOT_OK(Flush());
606
607 // At this point, all part uploads have finished successfully
608 DCHECK_GT(part_number_, 1);
609 DCHECK_EQ(upload_state_->completed_parts.size(),
610 static_cast<size_t>(part_number_ - 1));
611
612 S3Model::CompletedMultipartUpload completed_upload;
613 completed_upload.SetParts(upload_state_->completed_parts);
614 S3Model::CompleteMultipartUploadRequest req;
615 req.SetBucket(ToAwsString(path_.bucket));
616 req.SetKey(ToAwsString(path_.key));
617 req.SetUploadId(upload_id_);
618 req.SetMultipartUpload(std::move(completed_upload));
619
620 auto outcome = client_->CompleteMultipartUpload(req);
621 if (!outcome.IsSuccess()) {
622 return ErrorToStatus(
623 std::forward_as_tuple("When completing multiple part upload for key '",
624 path_.key, "' in bucket '", path_.bucket, "': "),
625 outcome.GetError());
626 }
627
628 closed_ = true;
629 return Status::OK();
630 }
631
closed() const632 bool closed() const override { return closed_; }
633
Tell() const634 Result<int64_t> Tell() const override {
635 if (closed_) {
636 return Status::Invalid("Operation on closed stream");
637 }
638 return pos_;
639 }
640
Write(const std::shared_ptr<Buffer> & buffer)641 Status Write(const std::shared_ptr<Buffer>& buffer) override {
642 return DoWrite(buffer->data(), buffer->size(), buffer);
643 }
644
Write(const void * data,int64_t nbytes)645 Status Write(const void* data, int64_t nbytes) override {
646 return DoWrite(data, nbytes);
647 }
648
DoWrite(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)649 Status DoWrite(const void* data, int64_t nbytes,
650 std::shared_ptr<Buffer> owned_buffer = nullptr) {
651 if (closed_) {
652 return Status::Invalid("Operation on closed stream");
653 }
654
655 if (!current_part_ && nbytes >= part_upload_threshold_) {
656 // No current part and data large enough, upload it directly
657 // (without copying if the buffer is owned)
658 RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
659 pos_ += nbytes;
660 return Status::OK();
661 }
662 // Can't upload data on its own, need to buffer it
663 if (!current_part_) {
664 ARROW_ASSIGN_OR_RAISE(current_part_,
665 io::BufferOutputStream::Create(part_upload_threshold_));
666 current_part_size_ = 0;
667 }
668 RETURN_NOT_OK(current_part_->Write(data, nbytes));
669 pos_ += nbytes;
670 current_part_size_ += nbytes;
671
672 if (current_part_size_ >= part_upload_threshold_) {
673 // Current part large enough, upload it
674 RETURN_NOT_OK(CommitCurrentPart());
675 }
676
677 return Status::OK();
678 }
679
Flush()680 Status Flush() override {
681 if (closed_) {
682 return Status::Invalid("Operation on closed stream");
683 }
684 // Wait for background writes to finish
685 std::unique_lock<std::mutex> lock(upload_state_->mutex);
686 upload_state_->cv.wait(lock,
687 [this]() { return upload_state_->parts_in_progress == 0; });
688 return upload_state_->status;
689 }
690
691 // Upload-related helpers
692
CommitCurrentPart()693 Status CommitCurrentPart() {
694 ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
695 current_part_.reset();
696 current_part_size_ = 0;
697 return UploadPart(buf);
698 }
699
UploadPart(std::shared_ptr<Buffer> buffer)700 Status UploadPart(std::shared_ptr<Buffer> buffer) {
701 return UploadPart(buffer->data(), buffer->size(), buffer);
702 }
703
UploadPart(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)704 Status UploadPart(const void* data, int64_t nbytes,
705 std::shared_ptr<Buffer> owned_buffer = nullptr) {
706 S3Model::UploadPartRequest req;
707 req.SetBucket(ToAwsString(path_.bucket));
708 req.SetKey(ToAwsString(path_.key));
709 req.SetUploadId(upload_id_);
710 req.SetPartNumber(part_number_);
711 req.SetContentLength(nbytes);
712
713 if (!options_.background_writes) {
714 req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
715 auto outcome = client_->UploadPart(req);
716 if (!outcome.IsSuccess()) {
717 return UploadPartError(req, outcome);
718 } else {
719 AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
720 }
721 } else {
722 std::unique_lock<std::mutex> lock(upload_state_->mutex);
723 auto state = upload_state_; // Keep upload state alive in closure
724 auto part_number = part_number_;
725
726 // If the data isn't owned, make an immutable copy for the lifetime of the closure
727 if (owned_buffer == nullptr) {
728 ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes));
729 memcpy(owned_buffer->mutable_data(), data, nbytes);
730 } else {
731 DCHECK_EQ(data, owned_buffer->data());
732 DCHECK_EQ(nbytes, owned_buffer->size());
733 }
734 req.SetBody(
735 std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
736
737 auto handler =
738 [state, owned_buffer, part_number](
739 const Aws::S3::S3Client*, const S3Model::UploadPartRequest& req,
740 const S3Model::UploadPartOutcome& outcome,
741 const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) -> void {
742 std::unique_lock<std::mutex> lock(state->mutex);
743 if (!outcome.IsSuccess()) {
744 state->status &= UploadPartError(req, outcome);
745 } else {
746 AddCompletedPart(state, part_number, outcome.GetResult());
747 }
748 // Notify completion, regardless of success / error status
749 if (--state->parts_in_progress == 0) {
750 state->cv.notify_all();
751 }
752 };
753 ++upload_state_->parts_in_progress;
754 client_->UploadPartAsync(req, handler);
755 }
756
757 ++part_number_;
758 // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
759 // of exactly 5MB would be limited to 50GB total. To avoid that, we bump
760 // the upload threshold every 100 parts. So the pattern is:
761 // - part 1 to 99: 5MB threshold
762 // - part 100 to 199: 10MB threshold
763 // - part 200 to 299: 15MB threshold
764 // ...
765 // - part 9900 to 9999: 500MB threshold
766 // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
767 // chunk sizes and avoiding too much buffering in the common case of a small-ish
768 // stream. If the limit's not enough, we can revisit.
769 if (part_number_ % 100 == 0) {
770 part_upload_threshold_ += kMinimumPartUpload;
771 }
772
773 return Status::OK();
774 }
775
AddCompletedPart(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartResult & result)776 static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number,
777 const S3Model::UploadPartResult& result) {
778 S3Model::CompletedPart part;
779 // Append ETag and part number for this uploaded part
780 // (will be needed for upload completion in Close())
781 part.SetPartNumber(part_number);
782 part.SetETag(result.GetETag());
783 int slot = part_number - 1;
784 if (state->completed_parts.size() <= static_cast<size_t>(slot)) {
785 state->completed_parts.resize(slot + 1);
786 }
787 DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet());
788 state->completed_parts[slot] = std::move(part);
789 }
790
UploadPartError(const S3Model::UploadPartRequest & req,const S3Model::UploadPartOutcome & outcome)791 static Status UploadPartError(const S3Model::UploadPartRequest& req,
792 const S3Model::UploadPartOutcome& outcome) {
793 return ErrorToStatus(
794 std::forward_as_tuple("When uploading part for key '", req.GetKey(),
795 "' in bucket '", req.GetBucket(), "': "),
796 outcome.GetError());
797 }
798
799 protected:
800 Aws::S3::S3Client* client_;
801 S3Path path_;
802 const S3Options& options_;
803 Aws::String upload_id_;
804 bool closed_ = true;
805 int64_t pos_ = 0;
806 int32_t part_number_ = 1;
807 std::shared_ptr<io::BufferOutputStream> current_part_;
808 int64_t current_part_size_ = 0;
809 int64_t part_upload_threshold_ = kMinimumPartUpload;
810
811 // This struct is kept alive through background writes to avoid problems
812 // in the completion handler.
813 struct UploadState {
814 std::mutex mutex;
815 std::condition_variable cv;
816 Aws::Vector<S3Model::CompletedPart> completed_parts;
817 int64_t parts_in_progress = 0;
818 Status status;
819
UploadStatearrow::fs::__anona7f17f1c0311::ObjectOutputStream::UploadState820 UploadState() : status(Status::OK()) {}
821 };
822 std::shared_ptr<UploadState> upload_state_;
823 };
824
825 // This function assumes info->path() is already set
FileObjectToInfo(const S3Model::HeadObjectResult & obj,FileInfo * info)826 void FileObjectToInfo(const S3Model::HeadObjectResult& obj, FileInfo* info) {
827 info->set_type(FileType::File);
828 info->set_size(static_cast<int64_t>(obj.GetContentLength()));
829 info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
830 }
831
FileObjectToInfo(const S3Model::Object & obj,FileInfo * info)832 void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
833 info->set_type(FileType::File);
834 info->set_size(static_cast<int64_t>(obj.GetSize()));
835 info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
836 }
837
838 } // namespace
839
840 class S3FileSystem::Impl {
841 public:
842 S3Options options_;
843 Aws::Client::ClientConfiguration client_config_;
844 Aws::Auth::AWSCredentials credentials_;
845 std::unique_ptr<Aws::S3::S3Client> client_;
846
847 const int32_t kListObjectsMaxKeys = 1000;
848 // At most 1000 keys per multiple-delete request
849 const int32_t kMultipleDeleteMaxKeys = 1000;
850 // Limit recursing depth, since a recursion bomb can be created
851 const int32_t kMaxNestingDepth = 100;
852
Impl(S3Options options)853 explicit Impl(S3Options options) : options_(std::move(options)) {}
854
Init()855 Status Init() {
856 credentials_ = options_.credentials_provider->GetAWSCredentials();
857 client_config_.region = ToAwsString(options_.region);
858 client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
859 if (options_.scheme == "http") {
860 client_config_.scheme = Aws::Http::Scheme::HTTP;
861 } else if (options_.scheme == "https") {
862 client_config_.scheme = Aws::Http::Scheme::HTTPS;
863 } else {
864 return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
865 }
866 client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
867 bool use_virtual_addressing = options_.endpoint_override.empty();
868 client_.reset(
869 new Aws::S3::S3Client(credentials_, client_config_,
870 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
871 use_virtual_addressing));
872 return Status::OK();
873 }
874
options() const875 S3Options options() const { return options_; }
876
877 // Create a bucket. Successful if bucket already exists.
CreateBucket(const std::string & bucket)878 Status CreateBucket(const std::string& bucket) {
879 S3Model::CreateBucketConfiguration config;
880 S3Model::CreateBucketRequest req;
881 config.SetLocationConstraint(
882 S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
883 ToAwsString(options_.region)));
884 req.SetBucket(ToAwsString(bucket));
885 req.SetCreateBucketConfiguration(config);
886
887 auto outcome = client_->CreateBucket(req);
888 if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
889 return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
890 outcome.GetError());
891 }
892 return Status::OK();
893 }
894
895 // Create an object with empty contents. Successful if object already exists.
CreateEmptyObject(const std::string & bucket,const std::string & key)896 Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
897 S3Model::PutObjectRequest req;
898 req.SetBucket(ToAwsString(bucket));
899 req.SetKey(ToAwsString(key));
900 return OutcomeToStatus(
901 std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
902 client_->PutObject(req));
903 }
904
CreateEmptyDir(const std::string & bucket,const std::string & key)905 Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
906 DCHECK(!key.empty());
907 return CreateEmptyObject(bucket, key + kSep);
908 }
909
DeleteObject(const std::string & bucket,const std::string & key)910 Status DeleteObject(const std::string& bucket, const std::string& key) {
911 S3Model::DeleteObjectRequest req;
912 req.SetBucket(ToAwsString(bucket));
913 req.SetKey(ToAwsString(key));
914 return OutcomeToStatus(
915 std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
916 client_->DeleteObject(req));
917 }
918
CopyObject(const S3Path & src_path,const S3Path & dest_path)919 Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
920 S3Model::CopyObjectRequest req;
921 req.SetBucket(ToAwsString(dest_path.bucket));
922 req.SetKey(ToAwsString(dest_path.key));
923 // Copy source "Must be URL-encoded" according to AWS SDK docs.
924 req.SetCopySource(src_path.ToURLEncodedAwsString());
925 return OutcomeToStatus(
926 std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
927 src_path.bucket, "' to key '", dest_path.key,
928 "' in bucket '", dest_path.bucket, "': "),
929 client_->CopyObject(req));
930 }
931
932 // On Minio, an empty "directory" doesn't satisfy the same API requests as
933 // a non-empty "directory". This is a Minio-specific quirk, but we need
934 // to handle it for unit testing.
935
IsEmptyDirectory(const std::string & bucket,const std::string & key,bool * out)936 Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) {
937 S3Model::HeadObjectRequest req;
938 req.SetBucket(ToAwsString(bucket));
939 req.SetKey(ToAwsString(key) + kSep);
940
941 auto outcome = client_->HeadObject(req);
942 if (outcome.IsSuccess()) {
943 *out = true;
944 return Status::OK();
945 }
946 if (IsNotFound(outcome.GetError())) {
947 *out = false;
948 return Status::OK();
949 }
950 return ErrorToStatus(std::forward_as_tuple("When reading information for key '", key,
951 "' in bucket '", bucket, "': "),
952 outcome.GetError());
953 }
954
IsEmptyDirectory(const S3Path & path,bool * out)955 Status IsEmptyDirectory(const S3Path& path, bool* out) {
956 return IsEmptyDirectory(path.bucket, path.key, out);
957 }
958
IsNonEmptyDirectory(const S3Path & path,bool * out)959 Status IsNonEmptyDirectory(const S3Path& path, bool* out) {
960 S3Model::ListObjectsV2Request req;
961 req.SetBucket(ToAwsString(path.bucket));
962 req.SetPrefix(ToAwsString(path.key) + kSep);
963 req.SetDelimiter(Aws::String() + kSep);
964 req.SetMaxKeys(1);
965 auto outcome = client_->ListObjectsV2(req);
966 if (outcome.IsSuccess()) {
967 *out = outcome.GetResult().GetKeyCount() > 0;
968 return Status::OK();
969 }
970 if (IsNotFound(outcome.GetError())) {
971 *out = false;
972 return Status::OK();
973 }
974 return ErrorToStatus(
975 std::forward_as_tuple("When listing objects under key '", path.key,
976 "' in bucket '", path.bucket, "': "),
977 outcome.GetError());
978 }
979
980 // List objects under a given prefix, issuing continuation requests if necessary
981 template <typename ResultCallable, typename ErrorCallable>
ListObjectsV2(const std::string & bucket,const std::string & prefix,ResultCallable && result_callable,ErrorCallable && error_callable)982 Status ListObjectsV2(const std::string& bucket, const std::string& prefix,
983 ResultCallable&& result_callable, ErrorCallable&& error_callable) {
984 S3Model::ListObjectsV2Request req;
985 req.SetBucket(ToAwsString(bucket));
986 if (!prefix.empty()) {
987 req.SetPrefix(ToAwsString(prefix) + kSep);
988 }
989 req.SetDelimiter(Aws::String() + kSep);
990 req.SetMaxKeys(kListObjectsMaxKeys);
991
992 while (true) {
993 auto outcome = client_->ListObjectsV2(req);
994 if (!outcome.IsSuccess()) {
995 return error_callable(outcome.GetError());
996 }
997 const auto& result = outcome.GetResult();
998 RETURN_NOT_OK(result_callable(result));
999 // Was the result limited by max-keys? If so, use the continuation token
1000 // to fetch further results.
1001 if (!result.GetIsTruncated()) {
1002 break;
1003 }
1004 DCHECK(!result.GetNextContinuationToken().empty());
1005 req.SetContinuationToken(result.GetNextContinuationToken());
1006 }
1007 return Status::OK();
1008 }
1009
1010 // Recursive workhorse for GetTargetStats(FileSelector...)
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,std::vector<FileInfo> * out)1011 Status Walk(const FileSelector& select, const std::string& bucket,
1012 const std::string& key, std::vector<FileInfo>* out) {
1013 int32_t nesting_depth = 0;
1014 return Walk(select, bucket, key, nesting_depth, out);
1015 }
1016
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,int32_t nesting_depth,std::vector<FileInfo> * out)1017 Status Walk(const FileSelector& select, const std::string& bucket,
1018 const std::string& key, int32_t nesting_depth, std::vector<FileInfo>* out) {
1019 if (nesting_depth >= kMaxNestingDepth) {
1020 return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1021 kMaxNestingDepth, ")");
1022 }
1023
1024 bool is_empty = true;
1025 std::vector<std::string> child_keys;
1026
1027 auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
1028 // Walk "files"
1029 for (const auto& obj : result.GetContents()) {
1030 is_empty = false;
1031 FileInfo info;
1032 const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
1033 if (child_key == util::string_view(key)) {
1034 // Amazon can return the "directory" key itself as part of the results, skip
1035 continue;
1036 }
1037 std::stringstream child_path;
1038 child_path << bucket << kSep << child_key;
1039 info.set_path(child_path.str());
1040 FileObjectToInfo(obj, &info);
1041 out->push_back(std::move(info));
1042 }
1043 // Walk "directories"
1044 for (const auto& prefix : result.GetCommonPrefixes()) {
1045 is_empty = false;
1046 const auto child_key =
1047 internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1048 std::stringstream ss;
1049 ss << bucket << kSep << child_key;
1050 FileInfo info;
1051 info.set_path(ss.str());
1052 info.set_type(FileType::Directory);
1053 out->push_back(std::move(info));
1054 if (select.recursive) {
1055 child_keys.emplace_back(child_key);
1056 }
1057 }
1058 return Status::OK();
1059 };
1060
1061 auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1062 if (select.allow_not_found && IsNotFound(error)) {
1063 return Status::OK();
1064 }
1065 return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1066 "' in bucket '", bucket, "': "),
1067 error);
1068 };
1069
1070 RETURN_NOT_OK(
1071 ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
1072
1073 // Recurse
1074 if (select.recursive && nesting_depth < select.max_recursion) {
1075 for (const auto& child_key : child_keys) {
1076 RETURN_NOT_OK(Walk(select, bucket, child_key, nesting_depth + 1, out));
1077 }
1078 }
1079
1080 // If no contents were found, perhaps it's an empty "directory",
1081 // or perhaps it's a nonexistent entry. Check.
1082 if (is_empty && !select.allow_not_found) {
1083 RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &is_empty));
1084 if (!is_empty) {
1085 return PathNotFound(bucket, key);
1086 }
1087 }
1088 return Status::OK();
1089 }
1090
WalkForDeleteDir(const std::string & bucket,const std::string & key,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1091 Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1092 std::vector<std::string>* file_keys,
1093 std::vector<std::string>* dir_keys) {
1094 int32_t nesting_depth = 0;
1095 return WalkForDeleteDir(bucket, key, nesting_depth, file_keys, dir_keys);
1096 }
1097
WalkForDeleteDir(const std::string & bucket,const std::string & key,int32_t nesting_depth,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1098 Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1099 int32_t nesting_depth, std::vector<std::string>* file_keys,
1100 std::vector<std::string>* dir_keys) {
1101 if (nesting_depth >= kMaxNestingDepth) {
1102 return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1103 kMaxNestingDepth, ")");
1104 }
1105
1106 std::vector<std::string> child_keys;
1107
1108 auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
1109 // Walk "files"
1110 for (const auto& obj : result.GetContents()) {
1111 file_keys->emplace_back(FromAwsString(obj.GetKey()));
1112 }
1113 // Walk "directories"
1114 for (const auto& prefix : result.GetCommonPrefixes()) {
1115 auto child_key = FromAwsString(prefix.GetPrefix());
1116 dir_keys->emplace_back(child_key);
1117 child_keys.emplace_back(internal::RemoveTrailingSlash(child_key));
1118 }
1119 return Status::OK();
1120 };
1121
1122 auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1123 return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1124 "' in bucket '", bucket, "': "),
1125 error);
1126 };
1127
1128 RETURN_NOT_OK(
1129 ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
1130
1131 // Recurse
1132 for (const auto& child_key : child_keys) {
1133 RETURN_NOT_OK(
1134 WalkForDeleteDir(bucket, child_key, nesting_depth + 1, file_keys, dir_keys));
1135 }
1136 return Status::OK();
1137 }
1138
1139 // Delete multiple objects at once
DeleteObjects(const std::string & bucket,const std::vector<std::string> & keys)1140 Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
1141 const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
1142 for (size_t start = 0; start < keys.size(); start += chunk_size) {
1143 S3Model::DeleteObjectsRequest req;
1144 S3Model::Delete del;
1145 for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) {
1146 del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i])));
1147 }
1148 req.SetBucket(ToAwsString(bucket));
1149 req.SetDelete(std::move(del));
1150 auto outcome = client_->DeleteObjects(req);
1151 if (!outcome.IsSuccess()) {
1152 return ErrorToStatus(outcome.GetError());
1153 }
1154 // Also need to check per-key errors, even on successful outcome
1155 // See
1156 // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
1157 const auto& errors = outcome.GetResult().GetErrors();
1158 if (!errors.empty()) {
1159 std::stringstream ss;
1160 ss << "Got the following " << errors.size()
1161 << " errors when deleting objects in S3 bucket '" << bucket << "':\n";
1162 for (const auto& error : errors) {
1163 ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
1164 }
1165 return Status::IOError(ss.str());
1166 }
1167 }
1168 return Status::OK();
1169 }
1170
DeleteDirContents(const std::string & bucket,const std::string & key)1171 Status DeleteDirContents(const std::string& bucket, const std::string& key) {
1172 std::vector<std::string> file_keys;
1173 std::vector<std::string> dir_keys;
1174 RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys));
1175 if (file_keys.empty() && dir_keys.empty() && !key.empty()) {
1176 // No contents found, is it an empty directory?
1177 bool exists = false;
1178 RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists));
1179 if (!exists) {
1180 return PathNotFound(bucket, key);
1181 }
1182 }
1183 // First delete all "files", then delete all child "directories"
1184 RETURN_NOT_OK(DeleteObjects(bucket, file_keys));
1185 // Delete directories in reverse lexicographic order, to ensure children
1186 // are deleted before their parents (Minio).
1187 std::sort(dir_keys.rbegin(), dir_keys.rend());
1188 return DeleteObjects(bucket, dir_keys);
1189 }
1190
EnsureDirectoryExists(const S3Path & path)1191 Status EnsureDirectoryExists(const S3Path& path) {
1192 if (!path.key.empty()) {
1193 return CreateEmptyDir(path.bucket, path.key);
1194 }
1195 return Status::OK();
1196 }
1197
EnsureParentExists(const S3Path & path)1198 Status EnsureParentExists(const S3Path& path) {
1199 if (path.has_parent()) {
1200 return EnsureDirectoryExists(path.parent());
1201 }
1202 return Status::OK();
1203 }
1204
ListBuckets(std::vector<std::string> * out)1205 Status ListBuckets(std::vector<std::string>* out) {
1206 out->clear();
1207 auto outcome = client_->ListBuckets();
1208 if (!outcome.IsSuccess()) {
1209 return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
1210 outcome.GetError());
1211 }
1212 for (const auto& bucket : outcome.GetResult().GetBuckets()) {
1213 out->emplace_back(FromAwsString(bucket.GetName()));
1214 }
1215 return Status::OK();
1216 }
1217 };
1218
S3FileSystem(const S3Options & options)1219 S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {}
1220
~S3FileSystem()1221 S3FileSystem::~S3FileSystem() {}
1222
Make(const S3Options & options)1223 Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(const S3Options& options) {
1224 RETURN_NOT_OK(CheckS3Initialized());
1225
1226 std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options));
1227 RETURN_NOT_OK(ptr->impl_->Init());
1228 return ptr;
1229 }
1230
Equals(const FileSystem & other) const1231 bool S3FileSystem::Equals(const FileSystem& other) const {
1232 if (this == &other) {
1233 return true;
1234 }
1235 if (other.type_name() != type_name()) {
1236 return false;
1237 }
1238 const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
1239 return options().Equals(s3fs.options());
1240 }
1241
options() const1242 S3Options S3FileSystem::options() const { return impl_->options(); }
1243
GetFileInfo(const std::string & s)1244 Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
1245 S3Path path;
1246 RETURN_NOT_OK(S3Path::FromString(s, &path));
1247 FileInfo info;
1248 info.set_path(s);
1249
1250 if (path.empty()) {
1251 // It's the root path ""
1252 info.set_type(FileType::Directory);
1253 return info;
1254 } else if (path.key.empty()) {
1255 // It's a bucket
1256 S3Model::HeadBucketRequest req;
1257 req.SetBucket(ToAwsString(path.bucket));
1258
1259 auto outcome = impl_->client_->HeadBucket(req);
1260 if (!outcome.IsSuccess()) {
1261 if (!IsNotFound(outcome.GetError())) {
1262 return ErrorToStatus(
1263 std::forward_as_tuple("When getting information for bucket '", path.bucket,
1264 "': "),
1265 outcome.GetError());
1266 }
1267 info.set_type(FileType::NotFound);
1268 return info;
1269 }
1270 // NOTE: S3 doesn't have a bucket modification time. Only a creation
1271 // time is available, and you have to list all buckets to get it.
1272 info.set_type(FileType::Directory);
1273 return info;
1274 } else {
1275 // It's an object
1276 S3Model::HeadObjectRequest req;
1277 req.SetBucket(ToAwsString(path.bucket));
1278 req.SetKey(ToAwsString(path.key));
1279
1280 auto outcome = impl_->client_->HeadObject(req);
1281 if (outcome.IsSuccess()) {
1282 // "File" object found
1283 FileObjectToInfo(outcome.GetResult(), &info);
1284 return info;
1285 }
1286 if (!IsNotFound(outcome.GetError())) {
1287 return ErrorToStatus(
1288 std::forward_as_tuple("When getting information for key '", path.key,
1289 "' in bucket '", path.bucket, "': "),
1290 outcome.GetError());
1291 }
1292 // Not found => perhaps it's an empty "directory"
1293 bool is_dir = false;
1294 RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir));
1295 if (is_dir) {
1296 info.set_type(FileType::Directory);
1297 return info;
1298 }
1299 // Not found => perhaps it's a non-empty "directory"
1300 RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir));
1301 if (is_dir) {
1302 info.set_type(FileType::Directory);
1303 } else {
1304 info.set_type(FileType::NotFound);
1305 }
1306 return info;
1307 }
1308 }
1309
GetFileInfo(const FileSelector & select)1310 Result<std::vector<FileInfo>> S3FileSystem::GetFileInfo(const FileSelector& select) {
1311 S3Path base_path;
1312 RETURN_NOT_OK(S3Path::FromString(select.base_dir, &base_path));
1313
1314 std::vector<FileInfo> results;
1315
1316 if (base_path.empty()) {
1317 // List all buckets
1318 std::vector<std::string> buckets;
1319 RETURN_NOT_OK(impl_->ListBuckets(&buckets));
1320 for (const auto& bucket : buckets) {
1321 FileInfo info;
1322 info.set_path(bucket);
1323 info.set_type(FileType::Directory);
1324 results.push_back(std::move(info));
1325 if (select.recursive) {
1326 RETURN_NOT_OK(impl_->Walk(select, bucket, "", &results));
1327 }
1328 }
1329 return results;
1330 }
1331
1332 // Nominal case -> walk a single bucket
1333 RETURN_NOT_OK(impl_->Walk(select, base_path.bucket, base_path.key, &results));
1334 return results;
1335 }
1336
CreateDir(const std::string & s,bool recursive)1337 Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
1338 S3Path path;
1339 RETURN_NOT_OK(S3Path::FromString(s, &path));
1340
1341 if (path.key.empty()) {
1342 // Create bucket
1343 return impl_->CreateBucket(path.bucket);
1344 }
1345
1346 // Create object
1347 if (recursive) {
1348 // Ensure bucket exists
1349 RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
1350 // Ensure that all parents exist, then the directory itself
1351 std::string parent_key;
1352 for (const auto& part : path.key_parts) {
1353 parent_key += part;
1354 parent_key += kSep;
1355 RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key));
1356 }
1357 return Status::OK();
1358 } else {
1359 // Check parent dir exists
1360 if (path.has_parent()) {
1361 S3Path parent_path = path.parent();
1362 bool exists;
1363 RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists));
1364 if (!exists) {
1365 RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists));
1366 }
1367 if (!exists) {
1368 return Status::IOError("Cannot create directory '", path.full_path,
1369 "': parent directory does not exist");
1370 }
1371 }
1372
1373 // XXX Should we check that no non-directory entry exists?
1374 // Minio does it for us, not sure about other S3 implementations.
1375 return impl_->CreateEmptyDir(path.bucket, path.key);
1376 }
1377 }
1378
DeleteDir(const std::string & s)1379 Status S3FileSystem::DeleteDir(const std::string& s) {
1380 S3Path path;
1381 RETURN_NOT_OK(S3Path::FromString(s, &path));
1382
1383 if (path.empty()) {
1384 return Status::NotImplemented("Cannot delete all S3 buckets");
1385 }
1386 RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1387 if (path.key.empty()) {
1388 // Delete bucket
1389 S3Model::DeleteBucketRequest req;
1390 req.SetBucket(ToAwsString(path.bucket));
1391 return OutcomeToStatus(
1392 std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
1393 impl_->client_->DeleteBucket(req));
1394 } else {
1395 // Delete "directory"
1396 RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
1397 // Parent may be implicitly deleted if it became empty, recreate it
1398 return impl_->EnsureParentExists(path);
1399 }
1400 }
1401
DeleteDirContents(const std::string & s)1402 Status S3FileSystem::DeleteDirContents(const std::string& s) {
1403 S3Path path;
1404 RETURN_NOT_OK(S3Path::FromString(s, &path));
1405
1406 if (path.empty()) {
1407 return Status::NotImplemented("Cannot delete all S3 buckets");
1408 }
1409 RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1410 // Directory may be implicitly deleted, recreate it
1411 return impl_->EnsureDirectoryExists(path);
1412 }
1413
DeleteFile(const std::string & s)1414 Status S3FileSystem::DeleteFile(const std::string& s) {
1415 S3Path path;
1416 RETURN_NOT_OK(S3Path::FromString(s, &path));
1417 RETURN_NOT_OK(ValidateFilePath(path));
1418
1419 // Check the object exists
1420 S3Model::HeadObjectRequest req;
1421 req.SetBucket(ToAwsString(path.bucket));
1422 req.SetKey(ToAwsString(path.key));
1423
1424 auto outcome = impl_->client_->HeadObject(req);
1425 if (!outcome.IsSuccess()) {
1426 if (IsNotFound(outcome.GetError())) {
1427 return PathNotFound(path);
1428 } else {
1429 return ErrorToStatus(
1430 std::forward_as_tuple("When getting information for key '", path.key,
1431 "' in bucket '", path.bucket, "': "),
1432 outcome.GetError());
1433 }
1434 }
1435 // Object found, delete it
1436 RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key));
1437 // Parent may be implicitly deleted if it became empty, recreate it
1438 return impl_->EnsureParentExists(path);
1439 }
1440
Move(const std::string & src,const std::string & dest)1441 Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
1442 // XXX We don't implement moving directories as it would be too expensive:
1443 // one must copy all directory contents one by one (including object data),
1444 // then delete the original contents.
1445
1446 S3Path src_path, dest_path;
1447 RETURN_NOT_OK(S3Path::FromString(src, &src_path));
1448 RETURN_NOT_OK(ValidateFilePath(src_path));
1449 RETURN_NOT_OK(S3Path::FromString(dest, &dest_path));
1450 RETURN_NOT_OK(ValidateFilePath(dest_path));
1451
1452 if (src_path == dest_path) {
1453 return Status::OK();
1454 }
1455 RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path));
1456 RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key));
1457 // Source parent may be implicitly deleted if it became empty, recreate it
1458 return impl_->EnsureParentExists(src_path);
1459 }
1460
CopyFile(const std::string & src,const std::string & dest)1461 Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
1462 S3Path src_path, dest_path;
1463 RETURN_NOT_OK(S3Path::FromString(src, &src_path));
1464 RETURN_NOT_OK(ValidateFilePath(src_path));
1465 RETURN_NOT_OK(S3Path::FromString(dest, &dest_path));
1466 RETURN_NOT_OK(ValidateFilePath(dest_path));
1467
1468 if (src_path == dest_path) {
1469 return Status::OK();
1470 }
1471 return impl_->CopyObject(src_path, dest_path);
1472 }
1473
OpenInputStream(const std::string & s)1474 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
1475 const std::string& s) {
1476 S3Path path;
1477 RETURN_NOT_OK(S3Path::FromString(s, &path));
1478 RETURN_NOT_OK(ValidateFilePath(path));
1479
1480 auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path);
1481 RETURN_NOT_OK(ptr->Init());
1482 return ptr;
1483 }
1484
OpenInputFile(const std::string & s)1485 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
1486 const std::string& s) {
1487 S3Path path;
1488 RETURN_NOT_OK(S3Path::FromString(s, &path));
1489 RETURN_NOT_OK(ValidateFilePath(path));
1490
1491 auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path);
1492 RETURN_NOT_OK(ptr->Init());
1493 return ptr;
1494 }
1495
OpenOutputStream(const std::string & s)1496 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
1497 const std::string& s) {
1498 S3Path path;
1499 RETURN_NOT_OK(S3Path::FromString(s, &path));
1500 RETURN_NOT_OK(ValidateFilePath(path));
1501
1502 auto ptr =
1503 std::make_shared<ObjectOutputStream>(impl_->client_.get(), path, impl_->options_);
1504 RETURN_NOT_OK(ptr->Init());
1505 return ptr;
1506 }
1507
OpenAppendStream(const std::string & path)1508 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
1509 const std::string& path) {
1510 // XXX Investigate UploadPartCopy? Does it work with source == destination?
1511 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
1512 // (but would need to fall back to GET if the current data is < 5 MB)
1513 return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
1514 }
1515
1516 } // namespace fs
1517 } // namespace arrow
1518