1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/filesystem/s3fs.h"
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <condition_variable>
23 #include <mutex>
24 #include <sstream>
25 #include <unordered_map>
26 #include <utility>
27 
28 #ifdef _WIN32
29 // Undefine preprocessor macros that interfere with AWS function / method names
30 #ifdef GetMessage
31 #undef GetMessage
32 #endif
33 #ifdef GetObject
34 #undef GetObject
35 #endif
36 #endif
37 
38 #include <aws/core/Aws.h>
39 #include <aws/core/auth/AWSCredentials.h>
40 #include <aws/core/auth/AWSCredentialsProviderChain.h>
41 #include <aws/core/client/RetryStrategy.h>
42 #include <aws/core/utils/logging/ConsoleLogSystem.h>
43 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
44 #include <aws/s3/S3Client.h>
45 #include <aws/s3/model/AbortMultipartUploadRequest.h>
46 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
47 #include <aws/s3/model/CompletedMultipartUpload.h>
48 #include <aws/s3/model/CompletedPart.h>
49 #include <aws/s3/model/CopyObjectRequest.h>
50 #include <aws/s3/model/CreateBucketRequest.h>
51 #include <aws/s3/model/CreateMultipartUploadRequest.h>
52 #include <aws/s3/model/DeleteBucketRequest.h>
53 #include <aws/s3/model/DeleteObjectRequest.h>
54 #include <aws/s3/model/DeleteObjectsRequest.h>
55 #include <aws/s3/model/GetObjectRequest.h>
56 #include <aws/s3/model/HeadBucketRequest.h>
57 #include <aws/s3/model/HeadObjectRequest.h>
58 #include <aws/s3/model/ListBucketsResult.h>
59 #include <aws/s3/model/ListObjectsV2Request.h>
60 #include <aws/s3/model/PutObjectRequest.h>
61 #include <aws/s3/model/UploadPartRequest.h>
62 
63 #include "arrow/buffer.h"
64 #include "arrow/filesystem/filesystem.h"
65 #include "arrow/filesystem/path_util.h"
66 #include "arrow/filesystem/s3_internal.h"
67 #include "arrow/io/interfaces.h"
68 #include "arrow/io/memory.h"
69 #include "arrow/io/util_internal.h"
70 #include "arrow/result.h"
71 #include "arrow/status.h"
72 #include "arrow/util/checked_cast.h"
73 #include "arrow/util/logging.h"
74 #include "arrow/util/windows_fixup.h"
75 
76 namespace arrow {
77 
78 using internal::Uri;
79 
80 namespace fs {
81 
82 using ::Aws::Client::AWSError;
83 using ::Aws::S3::S3Errors;
84 namespace S3Model = Aws::S3::Model;
85 
86 using ::arrow::fs::internal::ConnectRetryStrategy;
87 using ::arrow::fs::internal::ErrorToStatus;
88 using ::arrow::fs::internal::FromAwsDatetime;
89 using ::arrow::fs::internal::FromAwsString;
90 using ::arrow::fs::internal::IsAlreadyExists;
91 using ::arrow::fs::internal::IsNotFound;
92 using ::arrow::fs::internal::OutcomeToResult;
93 using ::arrow::fs::internal::OutcomeToStatus;
94 using ::arrow::fs::internal::ToAwsString;
95 using ::arrow::fs::internal::ToURLEncodedAwsString;
96 
97 const char* kS3DefaultRegion = "us-east-1";
98 
99 static const char kSep = '/';
100 
101 namespace {
102 
103 std::mutex aws_init_lock;
104 Aws::SDKOptions aws_options;
105 std::atomic<bool> aws_initialized(false);
106 
DoInitializeS3(const S3GlobalOptions & options)107 Status DoInitializeS3(const S3GlobalOptions& options) {
108   Aws::Utils::Logging::LogLevel aws_log_level;
109 
110 #define LOG_LEVEL_CASE(level_name)                             \
111   case S3LogLevel::level_name:                                 \
112     aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
113     break;
114 
115   switch (options.log_level) {
116     LOG_LEVEL_CASE(Fatal)
117     LOG_LEVEL_CASE(Error)
118     LOG_LEVEL_CASE(Warn)
119     LOG_LEVEL_CASE(Info)
120     LOG_LEVEL_CASE(Debug)
121     LOG_LEVEL_CASE(Trace)
122     default:
123       aws_log_level = Aws::Utils::Logging::LogLevel::Off;
124   }
125 
126 #undef LOG_LEVEL_CASE
127 
128   aws_options.loggingOptions.logLevel = aws_log_level;
129   // By default the AWS SDK logs to files, log to console instead
130   aws_options.loggingOptions.logger_create_fn = [] {
131     return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
132         aws_options.loggingOptions.logLevel);
133   };
134   Aws::InitAPI(aws_options);
135   aws_initialized.store(true);
136   return Status::OK();
137 }
138 
139 }  // namespace
140 
InitializeS3(const S3GlobalOptions & options)141 Status InitializeS3(const S3GlobalOptions& options) {
142   std::lock_guard<std::mutex> lock(aws_init_lock);
143   return DoInitializeS3(options);
144 }
145 
FinalizeS3()146 Status FinalizeS3() {
147   std::lock_guard<std::mutex> lock(aws_init_lock);
148   Aws::ShutdownAPI(aws_options);
149   aws_initialized.store(false);
150   return Status::OK();
151 }
152 
EnsureS3Initialized()153 Status EnsureS3Initialized() {
154   std::lock_guard<std::mutex> lock(aws_init_lock);
155   if (!aws_initialized.load()) {
156     S3GlobalOptions options{S3LogLevel::Fatal};
157     return DoInitializeS3(options);
158   }
159   return Status::OK();
160 }
161 
ConfigureDefaultCredentials()162 void S3Options::ConfigureDefaultCredentials() {
163   credentials_provider =
164       std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
165 }
166 
ConfigureAccessKey(const std::string & access_key,const std::string & secret_key)167 void S3Options::ConfigureAccessKey(const std::string& access_key,
168                                    const std::string& secret_key) {
169   credentials_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
170       ToAwsString(access_key), ToAwsString(secret_key));
171 }
172 
GetAccessKey() const173 std::string S3Options::GetAccessKey() const {
174   auto credentials = credentials_provider->GetAWSCredentials();
175   return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
176 }
177 
GetSecretKey() const178 std::string S3Options::GetSecretKey() const {
179   auto credentials = credentials_provider->GetAWSCredentials();
180   return std::string(FromAwsString(credentials.GetAWSSecretKey()));
181 }
182 
Defaults()183 S3Options S3Options::Defaults() {
184   S3Options options;
185   options.ConfigureDefaultCredentials();
186   return options;
187 }
188 
FromAccessKey(const std::string & access_key,const std::string & secret_key)189 S3Options S3Options::FromAccessKey(const std::string& access_key,
190                                    const std::string& secret_key) {
191   S3Options options;
192   options.ConfigureAccessKey(access_key, secret_key);
193   return options;
194 }
195 
FromUri(const Uri & uri,std::string * out_path)196 Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
197   S3Options options;
198 
199   const auto bucket = uri.host();
200   auto path = uri.path();
201   if (bucket.empty()) {
202     if (!path.empty()) {
203       return Status::Invalid("Missing bucket name in S3 URI");
204     }
205   } else {
206     if (path.empty()) {
207       path = bucket;
208     } else {
209       if (path[0] != '/') {
210         return Status::Invalid("S3 URI should absolute, not relative");
211       }
212       path = bucket + path;
213     }
214   }
215   if (out_path != nullptr) {
216     *out_path = std::string(internal::RemoveTrailingSlash(path));
217   }
218 
219   std::unordered_map<std::string, std::string> options_map;
220   ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
221   for (const auto& kv : options_items) {
222     options_map.emplace(kv.first, kv.second);
223   }
224 
225   const auto username = uri.username();
226   if (!username.empty()) {
227     options.ConfigureAccessKey(username, uri.password());
228   } else {
229     options.ConfigureDefaultCredentials();
230   }
231 
232   auto it = options_map.find("region");
233   if (it != options_map.end()) {
234     options.region = it->second;
235   }
236   it = options_map.find("scheme");
237   if (it != options_map.end()) {
238     options.scheme = it->second;
239   }
240   it = options_map.find("endpoint_override");
241   if (it != options_map.end()) {
242     options.endpoint_override = it->second;
243   }
244 
245   return options;
246 }
247 
FromUri(const std::string & uri_string,std::string * out_path)248 Result<S3Options> S3Options::FromUri(const std::string& uri_string,
249                                      std::string* out_path) {
250   Uri uri;
251   RETURN_NOT_OK(uri.Parse(uri_string));
252   return FromUri(uri, out_path);
253 }
254 
Equals(const S3Options & other) const255 bool S3Options::Equals(const S3Options& other) const {
256   return (region == other.region && endpoint_override == other.endpoint_override &&
257           scheme == other.scheme && background_writes == other.background_writes &&
258           GetAccessKey() == other.GetAccessKey() &&
259           GetSecretKey() == other.GetSecretKey());
260 }
261 
262 namespace {
263 
CheckS3Initialized()264 Status CheckS3Initialized() {
265   if (!aws_initialized.load()) {
266     return Status::Invalid(
267         "S3 subsystem not initialized; please call InitializeS3() "
268         "before carrying out any S3-related operation");
269   }
270   return Status::OK();
271 }
272 
273 // XXX Sanitize paths by removing leading slash?
274 
275 struct S3Path {
276   std::string full_path;
277   std::string bucket;
278   std::string key;
279   std::vector<std::string> key_parts;
280 
FromStringarrow::fs::__anona7f17f1c0311::S3Path281   static Status FromString(const std::string& s, S3Path* out) {
282     const auto src = internal::RemoveTrailingSlash(s);
283     auto first_sep = src.find_first_of(kSep);
284     if (first_sep == 0) {
285       return Status::Invalid("Path cannot start with a separator ('", s, "')");
286     }
287     if (first_sep == std::string::npos) {
288       *out = {std::string(src), std::string(src), "", {}};
289       return Status::OK();
290     }
291     out->full_path = std::string(src);
292     out->bucket = std::string(src.substr(0, first_sep));
293     out->key = std::string(src.substr(first_sep + 1));
294     out->key_parts = internal::SplitAbstractPath(out->key);
295     return Validate(out);
296   }
297 
Validatearrow::fs::__anona7f17f1c0311::S3Path298   static Status Validate(S3Path* path) {
299     auto result = internal::ValidateAbstractPathParts(path->key_parts);
300     if (!result.ok()) {
301       return Status::Invalid(result.message(), " in path ", path->full_path);
302     } else {
303       return result;
304     }
305   }
306 
ToURLEncodedAwsStringarrow::fs::__anona7f17f1c0311::S3Path307   Aws::String ToURLEncodedAwsString() const {
308     // URL-encode individual parts, not the '/' separator
309     Aws::String res;
310     res += internal::ToURLEncodedAwsString(bucket);
311     for (const auto& part : key_parts) {
312       res += kSep;
313       res += internal::ToURLEncodedAwsString(part);
314     }
315     return res;
316   }
317 
parentarrow::fs::__anona7f17f1c0311::S3Path318   S3Path parent() const {
319     DCHECK(!key_parts.empty());
320     auto parent = S3Path{"", bucket, "", key_parts};
321     parent.key_parts.pop_back();
322     parent.key = internal::JoinAbstractPath(parent.key_parts);
323     parent.full_path = parent.bucket + kSep + parent.key;
324     return parent;
325   }
326 
has_parentarrow::fs::__anona7f17f1c0311::S3Path327   bool has_parent() const { return !key.empty(); }
328 
emptyarrow::fs::__anona7f17f1c0311::S3Path329   bool empty() const { return bucket.empty() && key.empty(); }
330 
operator ==arrow::fs::__anona7f17f1c0311::S3Path331   bool operator==(const S3Path& other) const {
332     return bucket == other.bucket && key == other.key;
333   }
334 };
335 
336 // XXX return in OutcomeToStatus instead?
PathNotFound(const S3Path & path)337 Status PathNotFound(const S3Path& path) {
338   return Status::IOError("Path does not exist '", path.full_path, "'");
339 }
340 
PathNotFound(const std::string & bucket,const std::string & key)341 Status PathNotFound(const std::string& bucket, const std::string& key) {
342   return Status::IOError("Path does not exist '", bucket, kSep, key, "'");
343 }
344 
NotAFile(const S3Path & path)345 Status NotAFile(const S3Path& path) {
346   return Status::IOError("Not a regular file: '", path.full_path, "'");
347 }
348 
ValidateFilePath(const S3Path & path)349 Status ValidateFilePath(const S3Path& path) {
350   if (path.bucket.empty() || path.key.empty()) {
351     return NotAFile(path);
352   }
353   return Status::OK();
354 }
355 
FormatRange(int64_t start,int64_t length)356 std::string FormatRange(int64_t start, int64_t length) {
357   // Format a HTTP range header value
358   std::stringstream ss;
359   ss << "bytes=" << start << "-" << start + length - 1;
360   return ss.str();
361 }
362 
363 // A non-copying iostream.
364 // See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
365 // https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
366 class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
367  public:
StringViewStream(const void * data,int64_t nbytes)368   StringViewStream(const void* data, int64_t nbytes)
369       : Aws::Utils::Stream::PreallocatedStreamBuf(
370             reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
371             static_cast<size_t>(nbytes)),
372         std::iostream(this) {}
373 };
374 
375 // By default, the AWS SDK reads object data into an auto-growing StringStream.
376 // To avoid copies, read directly into our preallocated buffer instead.
377 // See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
378 // functionally similar recipe.
AwsWriteableStreamFactory(void * data,int64_t nbytes)379 Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
380   return [=]() { return new StringViewStream(data, nbytes); };
381 }
382 
GetObjectRange(Aws::S3::S3Client * client,const S3Path & path,int64_t start,int64_t length,void * out)383 Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
384                                                 const S3Path& path, int64_t start,
385                                                 int64_t length, void* out) {
386   S3Model::GetObjectRequest req;
387   req.SetBucket(ToAwsString(path.bucket));
388   req.SetKey(ToAwsString(path.key));
389   req.SetRange(ToAwsString(FormatRange(start, length)));
390   req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
391   return OutcomeToResult(client->GetObject(req));
392 }
393 
394 // A RandomAccessFile that reads from a S3 object
395 class ObjectInputFile : public io::RandomAccessFile {
396  public:
ObjectInputFile(Aws::S3::S3Client * client,const S3Path & path)397   ObjectInputFile(Aws::S3::S3Client* client, const S3Path& path)
398       : client_(client), path_(path) {}
399 
Init()400   Status Init() {
401     // Issue a HEAD Object to get the content-length and ensure any
402     // errors (e.g. file not found) don't wait until the first Read() call.
403     S3Model::HeadObjectRequest req;
404     req.SetBucket(ToAwsString(path_.bucket));
405     req.SetKey(ToAwsString(path_.key));
406 
407     auto outcome = client_->HeadObject(req);
408     if (!outcome.IsSuccess()) {
409       if (IsNotFound(outcome.GetError())) {
410         return PathNotFound(path_);
411       } else {
412         return ErrorToStatus(
413             std::forward_as_tuple("When reading information for key '", path_.key,
414                                   "' in bucket '", path_.bucket, "': "),
415             outcome.GetError());
416       }
417     }
418     content_length_ = outcome.GetResult().GetContentLength();
419     DCHECK_GE(content_length_, 0);
420     return Status::OK();
421   }
422 
CheckClosed() const423   Status CheckClosed() const {
424     if (closed_) {
425       return Status::Invalid("Operation on closed stream");
426     }
427     return Status::OK();
428   }
429 
CheckPosition(int64_t position,const char * action) const430   Status CheckPosition(int64_t position, const char* action) const {
431     if (position < 0) {
432       return Status::Invalid("Cannot ", action, " from negative position");
433     }
434     if (position > content_length_) {
435       return Status::IOError("Cannot ", action, " past end of file");
436     }
437     return Status::OK();
438   }
439 
440   // RandomAccessFile APIs
441 
Close()442   Status Close() override {
443     closed_ = true;
444     return Status::OK();
445   }
446 
closed() const447   bool closed() const override { return closed_; }
448 
Tell() const449   Result<int64_t> Tell() const override {
450     RETURN_NOT_OK(CheckClosed());
451     return pos_;
452   }
453 
GetSize()454   Result<int64_t> GetSize() override {
455     RETURN_NOT_OK(CheckClosed());
456     return content_length_;
457   }
458 
Seek(int64_t position)459   Status Seek(int64_t position) override {
460     RETURN_NOT_OK(CheckClosed());
461     RETURN_NOT_OK(CheckPosition(position, "seek"));
462 
463     pos_ = position;
464     return Status::OK();
465   }
466 
ReadAt(int64_t position,int64_t nbytes,void * out)467   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
468     RETURN_NOT_OK(CheckClosed());
469     RETURN_NOT_OK(CheckPosition(position, "read"));
470 
471     nbytes = std::min(nbytes, content_length_ - position);
472     if (nbytes == 0) {
473       return 0;
474     }
475 
476     // Read the desired range of bytes
477     ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
478                           GetObjectRange(client_, path_, position, nbytes, out));
479 
480     auto& stream = result.GetBody();
481     stream.ignore(nbytes);
482     // NOTE: the stream is a stringstream by default, there is no actual error
483     // to check for.  However, stream.fail() may return true if EOF is reached.
484     return stream.gcount();
485   }
486 
ReadAt(int64_t position,int64_t nbytes)487   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
488     RETURN_NOT_OK(CheckClosed());
489     RETURN_NOT_OK(CheckPosition(position, "read"));
490 
491     // No need to allocate more than the remaining number of bytes
492     nbytes = std::min(nbytes, content_length_ - position);
493 
494     ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes));
495     if (nbytes > 0) {
496       ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
497                             ReadAt(position, nbytes, buf->mutable_data()));
498       DCHECK_LE(bytes_read, nbytes);
499       RETURN_NOT_OK(buf->Resize(bytes_read));
500     }
501     return std::move(buf);
502   }
503 
Read(int64_t nbytes,void * out)504   Result<int64_t> Read(int64_t nbytes, void* out) override {
505     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
506     pos_ += bytes_read;
507     return bytes_read;
508   }
509 
Read(int64_t nbytes)510   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
511     ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
512     pos_ += buffer->size();
513     return std::move(buffer);
514   }
515 
516  protected:
517   Aws::S3::S3Client* client_;
518   S3Path path_;
519   bool closed_ = false;
520   int64_t pos_ = 0;
521   int64_t content_length_ = -1;
522 };
523 
524 // Minimum size for each part of a multipart upload, except for the last part.
525 // AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
526 // so I chose the safer value.
527 // (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
528 static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
529 
530 // An OutputStream that writes to a S3 object
531 class ObjectOutputStream : public io::OutputStream {
532  protected:
533   struct UploadState;
534 
535  public:
ObjectOutputStream(Aws::S3::S3Client * client,const S3Path & path,const S3Options & options)536   ObjectOutputStream(Aws::S3::S3Client* client, const S3Path& path,
537                      const S3Options& options)
538       : client_(client), path_(path), options_(options) {}
539 
~ObjectOutputStream()540   ~ObjectOutputStream() override {
541     // For compliance with the rest of the IO stack, Close rather than Abort,
542     // even though it may be more expensive.
543     io::internal::CloseFromDestructor(this);
544   }
545 
Init()546   Status Init() {
547     // Initiate the multi-part upload
548     S3Model::CreateMultipartUploadRequest req;
549     req.SetBucket(ToAwsString(path_.bucket));
550     req.SetKey(ToAwsString(path_.key));
551 
552     auto outcome = client_->CreateMultipartUpload(req);
553     if (!outcome.IsSuccess()) {
554       return ErrorToStatus(
555           std::forward_as_tuple("When initiating multiple part upload for key '",
556                                 path_.key, "' in bucket '", path_.bucket, "': "),
557           outcome.GetError());
558     }
559     upload_id_ = outcome.GetResult().GetUploadId();
560     upload_state_ = std::make_shared<UploadState>();
561     closed_ = false;
562     return Status::OK();
563   }
564 
Abort()565   Status Abort() override {
566     if (closed_) {
567       return Status::OK();
568     }
569 
570     S3Model::AbortMultipartUploadRequest req;
571     req.SetBucket(ToAwsString(path_.bucket));
572     req.SetKey(ToAwsString(path_.key));
573     req.SetUploadId(upload_id_);
574 
575     auto outcome = client_->AbortMultipartUpload(req);
576     if (!outcome.IsSuccess()) {
577       return ErrorToStatus(
578           std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
579                                 "' in bucket '", path_.bucket, "': "),
580           outcome.GetError());
581     }
582     current_part_.reset();
583     closed_ = true;
584     return Status::OK();
585   }
586 
587   // OutputStream interface
588 
Close()589   Status Close() override {
590     if (closed_) {
591       return Status::OK();
592     }
593 
594     if (current_part_) {
595       // Upload last part
596       RETURN_NOT_OK(CommitCurrentPart());
597     }
598 
599     // S3 mandates at least one part, upload an empty one if necessary
600     if (part_number_ == 1) {
601       RETURN_NOT_OK(UploadPart("", 0));
602     }
603 
604     // Wait for in-progress uploads to finish (if async writes are enabled)
605     RETURN_NOT_OK(Flush());
606 
607     // At this point, all part uploads have finished successfully
608     DCHECK_GT(part_number_, 1);
609     DCHECK_EQ(upload_state_->completed_parts.size(),
610               static_cast<size_t>(part_number_ - 1));
611 
612     S3Model::CompletedMultipartUpload completed_upload;
613     completed_upload.SetParts(upload_state_->completed_parts);
614     S3Model::CompleteMultipartUploadRequest req;
615     req.SetBucket(ToAwsString(path_.bucket));
616     req.SetKey(ToAwsString(path_.key));
617     req.SetUploadId(upload_id_);
618     req.SetMultipartUpload(std::move(completed_upload));
619 
620     auto outcome = client_->CompleteMultipartUpload(req);
621     if (!outcome.IsSuccess()) {
622       return ErrorToStatus(
623           std::forward_as_tuple("When completing multiple part upload for key '",
624                                 path_.key, "' in bucket '", path_.bucket, "': "),
625           outcome.GetError());
626     }
627 
628     closed_ = true;
629     return Status::OK();
630   }
631 
closed() const632   bool closed() const override { return closed_; }
633 
Tell() const634   Result<int64_t> Tell() const override {
635     if (closed_) {
636       return Status::Invalid("Operation on closed stream");
637     }
638     return pos_;
639   }
640 
Write(const std::shared_ptr<Buffer> & buffer)641   Status Write(const std::shared_ptr<Buffer>& buffer) override {
642     return DoWrite(buffer->data(), buffer->size(), buffer);
643   }
644 
Write(const void * data,int64_t nbytes)645   Status Write(const void* data, int64_t nbytes) override {
646     return DoWrite(data, nbytes);
647   }
648 
DoWrite(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)649   Status DoWrite(const void* data, int64_t nbytes,
650                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
651     if (closed_) {
652       return Status::Invalid("Operation on closed stream");
653     }
654 
655     if (!current_part_ && nbytes >= part_upload_threshold_) {
656       // No current part and data large enough, upload it directly
657       // (without copying if the buffer is owned)
658       RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
659       pos_ += nbytes;
660       return Status::OK();
661     }
662     // Can't upload data on its own, need to buffer it
663     if (!current_part_) {
664       ARROW_ASSIGN_OR_RAISE(current_part_,
665                             io::BufferOutputStream::Create(part_upload_threshold_));
666       current_part_size_ = 0;
667     }
668     RETURN_NOT_OK(current_part_->Write(data, nbytes));
669     pos_ += nbytes;
670     current_part_size_ += nbytes;
671 
672     if (current_part_size_ >= part_upload_threshold_) {
673       // Current part large enough, upload it
674       RETURN_NOT_OK(CommitCurrentPart());
675     }
676 
677     return Status::OK();
678   }
679 
Flush()680   Status Flush() override {
681     if (closed_) {
682       return Status::Invalid("Operation on closed stream");
683     }
684     // Wait for background writes to finish
685     std::unique_lock<std::mutex> lock(upload_state_->mutex);
686     upload_state_->cv.wait(lock,
687                            [this]() { return upload_state_->parts_in_progress == 0; });
688     return upload_state_->status;
689   }
690 
691   // Upload-related helpers
692 
CommitCurrentPart()693   Status CommitCurrentPart() {
694     ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
695     current_part_.reset();
696     current_part_size_ = 0;
697     return UploadPart(buf);
698   }
699 
UploadPart(std::shared_ptr<Buffer> buffer)700   Status UploadPart(std::shared_ptr<Buffer> buffer) {
701     return UploadPart(buffer->data(), buffer->size(), buffer);
702   }
703 
UploadPart(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)704   Status UploadPart(const void* data, int64_t nbytes,
705                     std::shared_ptr<Buffer> owned_buffer = nullptr) {
706     S3Model::UploadPartRequest req;
707     req.SetBucket(ToAwsString(path_.bucket));
708     req.SetKey(ToAwsString(path_.key));
709     req.SetUploadId(upload_id_);
710     req.SetPartNumber(part_number_);
711     req.SetContentLength(nbytes);
712 
713     if (!options_.background_writes) {
714       req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
715       auto outcome = client_->UploadPart(req);
716       if (!outcome.IsSuccess()) {
717         return UploadPartError(req, outcome);
718       } else {
719         AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
720       }
721     } else {
722       std::unique_lock<std::mutex> lock(upload_state_->mutex);
723       auto state = upload_state_;  // Keep upload state alive in closure
724       auto part_number = part_number_;
725 
726       // If the data isn't owned, make an immutable copy for the lifetime of the closure
727       if (owned_buffer == nullptr) {
728         ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes));
729         memcpy(owned_buffer->mutable_data(), data, nbytes);
730       } else {
731         DCHECK_EQ(data, owned_buffer->data());
732         DCHECK_EQ(nbytes, owned_buffer->size());
733       }
734       req.SetBody(
735           std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
736 
737       auto handler =
738           [state, owned_buffer, part_number](
739               const Aws::S3::S3Client*, const S3Model::UploadPartRequest& req,
740               const S3Model::UploadPartOutcome& outcome,
741               const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) -> void {
742         std::unique_lock<std::mutex> lock(state->mutex);
743         if (!outcome.IsSuccess()) {
744           state->status &= UploadPartError(req, outcome);
745         } else {
746           AddCompletedPart(state, part_number, outcome.GetResult());
747         }
748         // Notify completion, regardless of success / error status
749         if (--state->parts_in_progress == 0) {
750           state->cv.notify_all();
751         }
752       };
753       ++upload_state_->parts_in_progress;
754       client_->UploadPartAsync(req, handler);
755     }
756 
757     ++part_number_;
758     // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
759     // of exactly 5MB would be limited to 50GB total.  To avoid that, we bump
760     // the upload threshold every 100 parts.  So the pattern is:
761     // - part 1 to 99: 5MB threshold
762     // - part 100 to 199: 10MB threshold
763     // - part 200 to 299: 15MB threshold
764     // ...
765     // - part 9900 to 9999: 500MB threshold
766     // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
767     // chunk sizes and avoiding too much buffering in the common case of a small-ish
768     // stream.  If the limit's not enough, we can revisit.
769     if (part_number_ % 100 == 0) {
770       part_upload_threshold_ += kMinimumPartUpload;
771     }
772 
773     return Status::OK();
774   }
775 
AddCompletedPart(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartResult & result)776   static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number,
777                                const S3Model::UploadPartResult& result) {
778     S3Model::CompletedPart part;
779     // Append ETag and part number for this uploaded part
780     // (will be needed for upload completion in Close())
781     part.SetPartNumber(part_number);
782     part.SetETag(result.GetETag());
783     int slot = part_number - 1;
784     if (state->completed_parts.size() <= static_cast<size_t>(slot)) {
785       state->completed_parts.resize(slot + 1);
786     }
787     DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet());
788     state->completed_parts[slot] = std::move(part);
789   }
790 
UploadPartError(const S3Model::UploadPartRequest & req,const S3Model::UploadPartOutcome & outcome)791   static Status UploadPartError(const S3Model::UploadPartRequest& req,
792                                 const S3Model::UploadPartOutcome& outcome) {
793     return ErrorToStatus(
794         std::forward_as_tuple("When uploading part for key '", req.GetKey(),
795                               "' in bucket '", req.GetBucket(), "': "),
796         outcome.GetError());
797   }
798 
799  protected:
800   Aws::S3::S3Client* client_;
801   S3Path path_;
802   const S3Options& options_;
803   Aws::String upload_id_;
804   bool closed_ = true;
805   int64_t pos_ = 0;
806   int32_t part_number_ = 1;
807   std::shared_ptr<io::BufferOutputStream> current_part_;
808   int64_t current_part_size_ = 0;
809   int64_t part_upload_threshold_ = kMinimumPartUpload;
810 
811   // This struct is kept alive through background writes to avoid problems
812   // in the completion handler.
813   struct UploadState {
814     std::mutex mutex;
815     std::condition_variable cv;
816     Aws::Vector<S3Model::CompletedPart> completed_parts;
817     int64_t parts_in_progress = 0;
818     Status status;
819 
UploadStatearrow::fs::__anona7f17f1c0311::ObjectOutputStream::UploadState820     UploadState() : status(Status::OK()) {}
821   };
822   std::shared_ptr<UploadState> upload_state_;
823 };
824 
825 // This function assumes info->path() is already set
FileObjectToInfo(const S3Model::HeadObjectResult & obj,FileInfo * info)826 void FileObjectToInfo(const S3Model::HeadObjectResult& obj, FileInfo* info) {
827   info->set_type(FileType::File);
828   info->set_size(static_cast<int64_t>(obj.GetContentLength()));
829   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
830 }
831 
FileObjectToInfo(const S3Model::Object & obj,FileInfo * info)832 void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
833   info->set_type(FileType::File);
834   info->set_size(static_cast<int64_t>(obj.GetSize()));
835   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
836 }
837 
838 }  // namespace
839 
840 class S3FileSystem::Impl {
841  public:
842   S3Options options_;
843   Aws::Client::ClientConfiguration client_config_;
844   Aws::Auth::AWSCredentials credentials_;
845   std::unique_ptr<Aws::S3::S3Client> client_;
846 
847   const int32_t kListObjectsMaxKeys = 1000;
848   // At most 1000 keys per multiple-delete request
849   const int32_t kMultipleDeleteMaxKeys = 1000;
850   // Limit recursing depth, since a recursion bomb can be created
851   const int32_t kMaxNestingDepth = 100;
852 
Impl(S3Options options)853   explicit Impl(S3Options options) : options_(std::move(options)) {}
854 
Init()855   Status Init() {
856     credentials_ = options_.credentials_provider->GetAWSCredentials();
857     client_config_.region = ToAwsString(options_.region);
858     client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
859     if (options_.scheme == "http") {
860       client_config_.scheme = Aws::Http::Scheme::HTTP;
861     } else if (options_.scheme == "https") {
862       client_config_.scheme = Aws::Http::Scheme::HTTPS;
863     } else {
864       return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
865     }
866     client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
867     bool use_virtual_addressing = options_.endpoint_override.empty();
868     client_.reset(
869         new Aws::S3::S3Client(credentials_, client_config_,
870                               Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
871                               use_virtual_addressing));
872     return Status::OK();
873   }
874 
options() const875   S3Options options() const { return options_; }
876 
877   // Create a bucket.  Successful if bucket already exists.
CreateBucket(const std::string & bucket)878   Status CreateBucket(const std::string& bucket) {
879     S3Model::CreateBucketConfiguration config;
880     S3Model::CreateBucketRequest req;
881     config.SetLocationConstraint(
882         S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
883             ToAwsString(options_.region)));
884     req.SetBucket(ToAwsString(bucket));
885     req.SetCreateBucketConfiguration(config);
886 
887     auto outcome = client_->CreateBucket(req);
888     if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
889       return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
890                            outcome.GetError());
891     }
892     return Status::OK();
893   }
894 
895   // Create an object with empty contents.  Successful if object already exists.
CreateEmptyObject(const std::string & bucket,const std::string & key)896   Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
897     S3Model::PutObjectRequest req;
898     req.SetBucket(ToAwsString(bucket));
899     req.SetKey(ToAwsString(key));
900     return OutcomeToStatus(
901         std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
902         client_->PutObject(req));
903   }
904 
CreateEmptyDir(const std::string & bucket,const std::string & key)905   Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
906     DCHECK(!key.empty());
907     return CreateEmptyObject(bucket, key + kSep);
908   }
909 
DeleteObject(const std::string & bucket,const std::string & key)910   Status DeleteObject(const std::string& bucket, const std::string& key) {
911     S3Model::DeleteObjectRequest req;
912     req.SetBucket(ToAwsString(bucket));
913     req.SetKey(ToAwsString(key));
914     return OutcomeToStatus(
915         std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
916         client_->DeleteObject(req));
917   }
918 
CopyObject(const S3Path & src_path,const S3Path & dest_path)919   Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
920     S3Model::CopyObjectRequest req;
921     req.SetBucket(ToAwsString(dest_path.bucket));
922     req.SetKey(ToAwsString(dest_path.key));
923     // Copy source "Must be URL-encoded" according to AWS SDK docs.
924     req.SetCopySource(src_path.ToURLEncodedAwsString());
925     return OutcomeToStatus(
926         std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
927                               src_path.bucket, "' to key '", dest_path.key,
928                               "' in bucket '", dest_path.bucket, "': "),
929         client_->CopyObject(req));
930   }
931 
932   // On Minio, an empty "directory" doesn't satisfy the same API requests as
933   // a non-empty "directory".  This is a Minio-specific quirk, but we need
934   // to handle it for unit testing.
935 
IsEmptyDirectory(const std::string & bucket,const std::string & key,bool * out)936   Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) {
937     S3Model::HeadObjectRequest req;
938     req.SetBucket(ToAwsString(bucket));
939     req.SetKey(ToAwsString(key) + kSep);
940 
941     auto outcome = client_->HeadObject(req);
942     if (outcome.IsSuccess()) {
943       *out = true;
944       return Status::OK();
945     }
946     if (IsNotFound(outcome.GetError())) {
947       *out = false;
948       return Status::OK();
949     }
950     return ErrorToStatus(std::forward_as_tuple("When reading information for key '", key,
951                                                "' in bucket '", bucket, "': "),
952                          outcome.GetError());
953   }
954 
IsEmptyDirectory(const S3Path & path,bool * out)955   Status IsEmptyDirectory(const S3Path& path, bool* out) {
956     return IsEmptyDirectory(path.bucket, path.key, out);
957   }
958 
IsNonEmptyDirectory(const S3Path & path,bool * out)959   Status IsNonEmptyDirectory(const S3Path& path, bool* out) {
960     S3Model::ListObjectsV2Request req;
961     req.SetBucket(ToAwsString(path.bucket));
962     req.SetPrefix(ToAwsString(path.key) + kSep);
963     req.SetDelimiter(Aws::String() + kSep);
964     req.SetMaxKeys(1);
965     auto outcome = client_->ListObjectsV2(req);
966     if (outcome.IsSuccess()) {
967       *out = outcome.GetResult().GetKeyCount() > 0;
968       return Status::OK();
969     }
970     if (IsNotFound(outcome.GetError())) {
971       *out = false;
972       return Status::OK();
973     }
974     return ErrorToStatus(
975         std::forward_as_tuple("When listing objects under key '", path.key,
976                               "' in bucket '", path.bucket, "': "),
977         outcome.GetError());
978   }
979 
980   // List objects under a given prefix, issuing continuation requests if necessary
981   template <typename ResultCallable, typename ErrorCallable>
ListObjectsV2(const std::string & bucket,const std::string & prefix,ResultCallable && result_callable,ErrorCallable && error_callable)982   Status ListObjectsV2(const std::string& bucket, const std::string& prefix,
983                        ResultCallable&& result_callable, ErrorCallable&& error_callable) {
984     S3Model::ListObjectsV2Request req;
985     req.SetBucket(ToAwsString(bucket));
986     if (!prefix.empty()) {
987       req.SetPrefix(ToAwsString(prefix) + kSep);
988     }
989     req.SetDelimiter(Aws::String() + kSep);
990     req.SetMaxKeys(kListObjectsMaxKeys);
991 
992     while (true) {
993       auto outcome = client_->ListObjectsV2(req);
994       if (!outcome.IsSuccess()) {
995         return error_callable(outcome.GetError());
996       }
997       const auto& result = outcome.GetResult();
998       RETURN_NOT_OK(result_callable(result));
999       // Was the result limited by max-keys? If so, use the continuation token
1000       // to fetch further results.
1001       if (!result.GetIsTruncated()) {
1002         break;
1003       }
1004       DCHECK(!result.GetNextContinuationToken().empty());
1005       req.SetContinuationToken(result.GetNextContinuationToken());
1006     }
1007     return Status::OK();
1008   }
1009 
1010   // Recursive workhorse for GetTargetStats(FileSelector...)
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,std::vector<FileInfo> * out)1011   Status Walk(const FileSelector& select, const std::string& bucket,
1012               const std::string& key, std::vector<FileInfo>* out) {
1013     int32_t nesting_depth = 0;
1014     return Walk(select, bucket, key, nesting_depth, out);
1015   }
1016 
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,int32_t nesting_depth,std::vector<FileInfo> * out)1017   Status Walk(const FileSelector& select, const std::string& bucket,
1018               const std::string& key, int32_t nesting_depth, std::vector<FileInfo>* out) {
1019     if (nesting_depth >= kMaxNestingDepth) {
1020       return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1021                              kMaxNestingDepth, ")");
1022     }
1023 
1024     bool is_empty = true;
1025     std::vector<std::string> child_keys;
1026 
1027     auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
1028       // Walk "files"
1029       for (const auto& obj : result.GetContents()) {
1030         is_empty = false;
1031         FileInfo info;
1032         const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
1033         if (child_key == util::string_view(key)) {
1034           // Amazon can return the "directory" key itself as part of the results, skip
1035           continue;
1036         }
1037         std::stringstream child_path;
1038         child_path << bucket << kSep << child_key;
1039         info.set_path(child_path.str());
1040         FileObjectToInfo(obj, &info);
1041         out->push_back(std::move(info));
1042       }
1043       // Walk "directories"
1044       for (const auto& prefix : result.GetCommonPrefixes()) {
1045         is_empty = false;
1046         const auto child_key =
1047             internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1048         std::stringstream ss;
1049         ss << bucket << kSep << child_key;
1050         FileInfo info;
1051         info.set_path(ss.str());
1052         info.set_type(FileType::Directory);
1053         out->push_back(std::move(info));
1054         if (select.recursive) {
1055           child_keys.emplace_back(child_key);
1056         }
1057       }
1058       return Status::OK();
1059     };
1060 
1061     auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1062       if (select.allow_not_found && IsNotFound(error)) {
1063         return Status::OK();
1064       }
1065       return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1066                                                  "' in bucket '", bucket, "': "),
1067                            error);
1068     };
1069 
1070     RETURN_NOT_OK(
1071         ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
1072 
1073     // Recurse
1074     if (select.recursive && nesting_depth < select.max_recursion) {
1075       for (const auto& child_key : child_keys) {
1076         RETURN_NOT_OK(Walk(select, bucket, child_key, nesting_depth + 1, out));
1077       }
1078     }
1079 
1080     // If no contents were found, perhaps it's an empty "directory",
1081     // or perhaps it's a nonexistent entry.  Check.
1082     if (is_empty && !select.allow_not_found) {
1083       RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &is_empty));
1084       if (!is_empty) {
1085         return PathNotFound(bucket, key);
1086       }
1087     }
1088     return Status::OK();
1089   }
1090 
WalkForDeleteDir(const std::string & bucket,const std::string & key,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1091   Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1092                           std::vector<std::string>* file_keys,
1093                           std::vector<std::string>* dir_keys) {
1094     int32_t nesting_depth = 0;
1095     return WalkForDeleteDir(bucket, key, nesting_depth, file_keys, dir_keys);
1096   }
1097 
WalkForDeleteDir(const std::string & bucket,const std::string & key,int32_t nesting_depth,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1098   Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1099                           int32_t nesting_depth, std::vector<std::string>* file_keys,
1100                           std::vector<std::string>* dir_keys) {
1101     if (nesting_depth >= kMaxNestingDepth) {
1102       return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1103                              kMaxNestingDepth, ")");
1104     }
1105 
1106     std::vector<std::string> child_keys;
1107 
1108     auto handle_results = [&](const S3Model::ListObjectsV2Result& result) -> Status {
1109       // Walk "files"
1110       for (const auto& obj : result.GetContents()) {
1111         file_keys->emplace_back(FromAwsString(obj.GetKey()));
1112       }
1113       // Walk "directories"
1114       for (const auto& prefix : result.GetCommonPrefixes()) {
1115         auto child_key = FromAwsString(prefix.GetPrefix());
1116         dir_keys->emplace_back(child_key);
1117         child_keys.emplace_back(internal::RemoveTrailingSlash(child_key));
1118       }
1119       return Status::OK();
1120     };
1121 
1122     auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1123       return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1124                                                  "' in bucket '", bucket, "': "),
1125                            error);
1126     };
1127 
1128     RETURN_NOT_OK(
1129         ListObjectsV2(bucket, key, std::move(handle_results), std::move(handle_error)));
1130 
1131     // Recurse
1132     for (const auto& child_key : child_keys) {
1133       RETURN_NOT_OK(
1134           WalkForDeleteDir(bucket, child_key, nesting_depth + 1, file_keys, dir_keys));
1135     }
1136     return Status::OK();
1137   }
1138 
1139   // Delete multiple objects at once
DeleteObjects(const std::string & bucket,const std::vector<std::string> & keys)1140   Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
1141     const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
1142     for (size_t start = 0; start < keys.size(); start += chunk_size) {
1143       S3Model::DeleteObjectsRequest req;
1144       S3Model::Delete del;
1145       for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) {
1146         del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i])));
1147       }
1148       req.SetBucket(ToAwsString(bucket));
1149       req.SetDelete(std::move(del));
1150       auto outcome = client_->DeleteObjects(req);
1151       if (!outcome.IsSuccess()) {
1152         return ErrorToStatus(outcome.GetError());
1153       }
1154       // Also need to check per-key errors, even on successful outcome
1155       // See
1156       // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
1157       const auto& errors = outcome.GetResult().GetErrors();
1158       if (!errors.empty()) {
1159         std::stringstream ss;
1160         ss << "Got the following " << errors.size()
1161            << " errors when deleting objects in S3 bucket '" << bucket << "':\n";
1162         for (const auto& error : errors) {
1163           ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
1164         }
1165         return Status::IOError(ss.str());
1166       }
1167     }
1168     return Status::OK();
1169   }
1170 
DeleteDirContents(const std::string & bucket,const std::string & key)1171   Status DeleteDirContents(const std::string& bucket, const std::string& key) {
1172     std::vector<std::string> file_keys;
1173     std::vector<std::string> dir_keys;
1174     RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys));
1175     if (file_keys.empty() && dir_keys.empty() && !key.empty()) {
1176       // No contents found, is it an empty directory?
1177       bool exists = false;
1178       RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists));
1179       if (!exists) {
1180         return PathNotFound(bucket, key);
1181       }
1182     }
1183     // First delete all "files", then delete all child "directories"
1184     RETURN_NOT_OK(DeleteObjects(bucket, file_keys));
1185     // Delete directories in reverse lexicographic order, to ensure children
1186     // are deleted before their parents (Minio).
1187     std::sort(dir_keys.rbegin(), dir_keys.rend());
1188     return DeleteObjects(bucket, dir_keys);
1189   }
1190 
EnsureDirectoryExists(const S3Path & path)1191   Status EnsureDirectoryExists(const S3Path& path) {
1192     if (!path.key.empty()) {
1193       return CreateEmptyDir(path.bucket, path.key);
1194     }
1195     return Status::OK();
1196   }
1197 
EnsureParentExists(const S3Path & path)1198   Status EnsureParentExists(const S3Path& path) {
1199     if (path.has_parent()) {
1200       return EnsureDirectoryExists(path.parent());
1201     }
1202     return Status::OK();
1203   }
1204 
ListBuckets(std::vector<std::string> * out)1205   Status ListBuckets(std::vector<std::string>* out) {
1206     out->clear();
1207     auto outcome = client_->ListBuckets();
1208     if (!outcome.IsSuccess()) {
1209       return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
1210                            outcome.GetError());
1211     }
1212     for (const auto& bucket : outcome.GetResult().GetBuckets()) {
1213       out->emplace_back(FromAwsString(bucket.GetName()));
1214     }
1215     return Status::OK();
1216   }
1217 };
1218 
S3FileSystem(const S3Options & options)1219 S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {}
1220 
~S3FileSystem()1221 S3FileSystem::~S3FileSystem() {}
1222 
Make(const S3Options & options)1223 Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(const S3Options& options) {
1224   RETURN_NOT_OK(CheckS3Initialized());
1225 
1226   std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options));
1227   RETURN_NOT_OK(ptr->impl_->Init());
1228   return ptr;
1229 }
1230 
Equals(const FileSystem & other) const1231 bool S3FileSystem::Equals(const FileSystem& other) const {
1232   if (this == &other) {
1233     return true;
1234   }
1235   if (other.type_name() != type_name()) {
1236     return false;
1237   }
1238   const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
1239   return options().Equals(s3fs.options());
1240 }
1241 
options() const1242 S3Options S3FileSystem::options() const { return impl_->options(); }
1243 
GetFileInfo(const std::string & s)1244 Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
1245   S3Path path;
1246   RETURN_NOT_OK(S3Path::FromString(s, &path));
1247   FileInfo info;
1248   info.set_path(s);
1249 
1250   if (path.empty()) {
1251     // It's the root path ""
1252     info.set_type(FileType::Directory);
1253     return info;
1254   } else if (path.key.empty()) {
1255     // It's a bucket
1256     S3Model::HeadBucketRequest req;
1257     req.SetBucket(ToAwsString(path.bucket));
1258 
1259     auto outcome = impl_->client_->HeadBucket(req);
1260     if (!outcome.IsSuccess()) {
1261       if (!IsNotFound(outcome.GetError())) {
1262         return ErrorToStatus(
1263             std::forward_as_tuple("When getting information for bucket '", path.bucket,
1264                                   "': "),
1265             outcome.GetError());
1266       }
1267       info.set_type(FileType::NotFound);
1268       return info;
1269     }
1270     // NOTE: S3 doesn't have a bucket modification time.  Only a creation
1271     // time is available, and you have to list all buckets to get it.
1272     info.set_type(FileType::Directory);
1273     return info;
1274   } else {
1275     // It's an object
1276     S3Model::HeadObjectRequest req;
1277     req.SetBucket(ToAwsString(path.bucket));
1278     req.SetKey(ToAwsString(path.key));
1279 
1280     auto outcome = impl_->client_->HeadObject(req);
1281     if (outcome.IsSuccess()) {
1282       // "File" object found
1283       FileObjectToInfo(outcome.GetResult(), &info);
1284       return info;
1285     }
1286     if (!IsNotFound(outcome.GetError())) {
1287       return ErrorToStatus(
1288           std::forward_as_tuple("When getting information for key '", path.key,
1289                                 "' in bucket '", path.bucket, "': "),
1290           outcome.GetError());
1291     }
1292     // Not found => perhaps it's an empty "directory"
1293     bool is_dir = false;
1294     RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir));
1295     if (is_dir) {
1296       info.set_type(FileType::Directory);
1297       return info;
1298     }
1299     // Not found => perhaps it's a non-empty "directory"
1300     RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir));
1301     if (is_dir) {
1302       info.set_type(FileType::Directory);
1303     } else {
1304       info.set_type(FileType::NotFound);
1305     }
1306     return info;
1307   }
1308 }
1309 
GetFileInfo(const FileSelector & select)1310 Result<std::vector<FileInfo>> S3FileSystem::GetFileInfo(const FileSelector& select) {
1311   S3Path base_path;
1312   RETURN_NOT_OK(S3Path::FromString(select.base_dir, &base_path));
1313 
1314   std::vector<FileInfo> results;
1315 
1316   if (base_path.empty()) {
1317     // List all buckets
1318     std::vector<std::string> buckets;
1319     RETURN_NOT_OK(impl_->ListBuckets(&buckets));
1320     for (const auto& bucket : buckets) {
1321       FileInfo info;
1322       info.set_path(bucket);
1323       info.set_type(FileType::Directory);
1324       results.push_back(std::move(info));
1325       if (select.recursive) {
1326         RETURN_NOT_OK(impl_->Walk(select, bucket, "", &results));
1327       }
1328     }
1329     return results;
1330   }
1331 
1332   // Nominal case -> walk a single bucket
1333   RETURN_NOT_OK(impl_->Walk(select, base_path.bucket, base_path.key, &results));
1334   return results;
1335 }
1336 
CreateDir(const std::string & s,bool recursive)1337 Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
1338   S3Path path;
1339   RETURN_NOT_OK(S3Path::FromString(s, &path));
1340 
1341   if (path.key.empty()) {
1342     // Create bucket
1343     return impl_->CreateBucket(path.bucket);
1344   }
1345 
1346   // Create object
1347   if (recursive) {
1348     // Ensure bucket exists
1349     RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
1350     // Ensure that all parents exist, then the directory itself
1351     std::string parent_key;
1352     for (const auto& part : path.key_parts) {
1353       parent_key += part;
1354       parent_key += kSep;
1355       RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key));
1356     }
1357     return Status::OK();
1358   } else {
1359     // Check parent dir exists
1360     if (path.has_parent()) {
1361       S3Path parent_path = path.parent();
1362       bool exists;
1363       RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists));
1364       if (!exists) {
1365         RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists));
1366       }
1367       if (!exists) {
1368         return Status::IOError("Cannot create directory '", path.full_path,
1369                                "': parent directory does not exist");
1370       }
1371     }
1372 
1373     // XXX Should we check that no non-directory entry exists?
1374     // Minio does it for us, not sure about other S3 implementations.
1375     return impl_->CreateEmptyDir(path.bucket, path.key);
1376   }
1377 }
1378 
DeleteDir(const std::string & s)1379 Status S3FileSystem::DeleteDir(const std::string& s) {
1380   S3Path path;
1381   RETURN_NOT_OK(S3Path::FromString(s, &path));
1382 
1383   if (path.empty()) {
1384     return Status::NotImplemented("Cannot delete all S3 buckets");
1385   }
1386   RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1387   if (path.key.empty()) {
1388     // Delete bucket
1389     S3Model::DeleteBucketRequest req;
1390     req.SetBucket(ToAwsString(path.bucket));
1391     return OutcomeToStatus(
1392         std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
1393         impl_->client_->DeleteBucket(req));
1394   } else {
1395     // Delete "directory"
1396     RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
1397     // Parent may be implicitly deleted if it became empty, recreate it
1398     return impl_->EnsureParentExists(path);
1399   }
1400 }
1401 
DeleteDirContents(const std::string & s)1402 Status S3FileSystem::DeleteDirContents(const std::string& s) {
1403   S3Path path;
1404   RETURN_NOT_OK(S3Path::FromString(s, &path));
1405 
1406   if (path.empty()) {
1407     return Status::NotImplemented("Cannot delete all S3 buckets");
1408   }
1409   RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1410   // Directory may be implicitly deleted, recreate it
1411   return impl_->EnsureDirectoryExists(path);
1412 }
1413 
DeleteFile(const std::string & s)1414 Status S3FileSystem::DeleteFile(const std::string& s) {
1415   S3Path path;
1416   RETURN_NOT_OK(S3Path::FromString(s, &path));
1417   RETURN_NOT_OK(ValidateFilePath(path));
1418 
1419   // Check the object exists
1420   S3Model::HeadObjectRequest req;
1421   req.SetBucket(ToAwsString(path.bucket));
1422   req.SetKey(ToAwsString(path.key));
1423 
1424   auto outcome = impl_->client_->HeadObject(req);
1425   if (!outcome.IsSuccess()) {
1426     if (IsNotFound(outcome.GetError())) {
1427       return PathNotFound(path);
1428     } else {
1429       return ErrorToStatus(
1430           std::forward_as_tuple("When getting information for key '", path.key,
1431                                 "' in bucket '", path.bucket, "': "),
1432           outcome.GetError());
1433     }
1434   }
1435   // Object found, delete it
1436   RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key));
1437   // Parent may be implicitly deleted if it became empty, recreate it
1438   return impl_->EnsureParentExists(path);
1439 }
1440 
Move(const std::string & src,const std::string & dest)1441 Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
1442   // XXX We don't implement moving directories as it would be too expensive:
1443   // one must copy all directory contents one by one (including object data),
1444   // then delete the original contents.
1445 
1446   S3Path src_path, dest_path;
1447   RETURN_NOT_OK(S3Path::FromString(src, &src_path));
1448   RETURN_NOT_OK(ValidateFilePath(src_path));
1449   RETURN_NOT_OK(S3Path::FromString(dest, &dest_path));
1450   RETURN_NOT_OK(ValidateFilePath(dest_path));
1451 
1452   if (src_path == dest_path) {
1453     return Status::OK();
1454   }
1455   RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path));
1456   RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key));
1457   // Source parent may be implicitly deleted if it became empty, recreate it
1458   return impl_->EnsureParentExists(src_path);
1459 }
1460 
CopyFile(const std::string & src,const std::string & dest)1461 Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
1462   S3Path src_path, dest_path;
1463   RETURN_NOT_OK(S3Path::FromString(src, &src_path));
1464   RETURN_NOT_OK(ValidateFilePath(src_path));
1465   RETURN_NOT_OK(S3Path::FromString(dest, &dest_path));
1466   RETURN_NOT_OK(ValidateFilePath(dest_path));
1467 
1468   if (src_path == dest_path) {
1469     return Status::OK();
1470   }
1471   return impl_->CopyObject(src_path, dest_path);
1472 }
1473 
OpenInputStream(const std::string & s)1474 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
1475     const std::string& s) {
1476   S3Path path;
1477   RETURN_NOT_OK(S3Path::FromString(s, &path));
1478   RETURN_NOT_OK(ValidateFilePath(path));
1479 
1480   auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path);
1481   RETURN_NOT_OK(ptr->Init());
1482   return ptr;
1483 }
1484 
OpenInputFile(const std::string & s)1485 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
1486     const std::string& s) {
1487   S3Path path;
1488   RETURN_NOT_OK(S3Path::FromString(s, &path));
1489   RETURN_NOT_OK(ValidateFilePath(path));
1490 
1491   auto ptr = std::make_shared<ObjectInputFile>(impl_->client_.get(), path);
1492   RETURN_NOT_OK(ptr->Init());
1493   return ptr;
1494 }
1495 
OpenOutputStream(const std::string & s)1496 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
1497     const std::string& s) {
1498   S3Path path;
1499   RETURN_NOT_OK(S3Path::FromString(s, &path));
1500   RETURN_NOT_OK(ValidateFilePath(path));
1501 
1502   auto ptr =
1503       std::make_shared<ObjectOutputStream>(impl_->client_.get(), path, impl_->options_);
1504   RETURN_NOT_OK(ptr->Init());
1505   return ptr;
1506 }
1507 
OpenAppendStream(const std::string & path)1508 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
1509     const std::string& path) {
1510   // XXX Investigate UploadPartCopy? Does it work with source == destination?
1511   // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
1512   // (but would need to fall back to GET if the current data is < 5 MB)
1513   return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
1514 }
1515 
1516 }  // namespace fs
1517 }  // namespace arrow
1518