1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/filesystem/s3fs.h"
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <condition_variable>
23 #include <functional>
24 #include <mutex>
25 #include <sstream>
26 #include <unordered_map>
27 #include <utility>
28 
29 #ifdef _WIN32
30 // Undefine preprocessor macros that interfere with AWS function / method names
31 #ifdef GetMessage
32 #undef GetMessage
33 #endif
34 #ifdef GetObject
35 #undef GetObject
36 #endif
37 #endif
38 
39 #include <aws/core/Aws.h>
40 #include <aws/core/Region.h>
41 #include <aws/core/auth/AWSCredentials.h>
42 #include <aws/core/auth/AWSCredentialsProviderChain.h>
43 #include <aws/core/client/RetryStrategy.h>
44 #include <aws/core/http/HttpResponse.h>
45 #include <aws/core/utils/logging/ConsoleLogSystem.h>
46 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
47 #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
48 #include <aws/s3/S3Client.h>
49 #include <aws/s3/model/AbortMultipartUploadRequest.h>
50 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
51 #include <aws/s3/model/CompletedMultipartUpload.h>
52 #include <aws/s3/model/CompletedPart.h>
53 #include <aws/s3/model/CopyObjectRequest.h>
54 #include <aws/s3/model/CreateBucketRequest.h>
55 #include <aws/s3/model/CreateMultipartUploadRequest.h>
56 #include <aws/s3/model/DeleteBucketRequest.h>
57 #include <aws/s3/model/DeleteObjectRequest.h>
58 #include <aws/s3/model/DeleteObjectsRequest.h>
59 #include <aws/s3/model/GetObjectRequest.h>
60 #include <aws/s3/model/HeadBucketRequest.h>
61 #include <aws/s3/model/HeadObjectRequest.h>
62 #include <aws/s3/model/ListBucketsResult.h>
63 #include <aws/s3/model/ListObjectsV2Request.h>
64 #include <aws/s3/model/PutObjectRequest.h>
65 #include <aws/s3/model/UploadPartRequest.h>
66 
67 #include "arrow/buffer.h"
68 #include "arrow/filesystem/filesystem.h"
69 #include "arrow/filesystem/path_util.h"
70 #include "arrow/filesystem/s3_internal.h"
71 #include "arrow/filesystem/util_internal.h"
72 #include "arrow/io/interfaces.h"
73 #include "arrow/io/memory.h"
74 #include "arrow/io/util_internal.h"
75 #include "arrow/result.h"
76 #include "arrow/status.h"
77 #include "arrow/util/atomic_shared_ptr.h"
78 #include "arrow/util/checked_cast.h"
79 #include "arrow/util/future.h"
80 #include "arrow/util/logging.h"
81 #include "arrow/util/optional.h"
82 #include "arrow/util/windows_fixup.h"
83 
84 namespace arrow {
85 
86 using internal::Uri;
87 
88 namespace fs {
89 
90 using ::Aws::Client::AWSError;
91 using ::Aws::S3::S3Errors;
92 namespace S3Model = Aws::S3::Model;
93 
94 using internal::ConnectRetryStrategy;
95 using internal::DetectS3Backend;
96 using internal::ErrorToStatus;
97 using internal::FromAwsDatetime;
98 using internal::FromAwsString;
99 using internal::IsAlreadyExists;
100 using internal::IsNotFound;
101 using internal::OutcomeToResult;
102 using internal::OutcomeToStatus;
103 using internal::S3Backend;
104 using internal::ToAwsString;
105 using internal::ToURLEncodedAwsString;
106 
107 static const char kSep = '/';
108 
109 namespace {
110 
111 std::mutex aws_init_lock;
112 Aws::SDKOptions aws_options;
113 std::atomic<bool> aws_initialized(false);
114 
DoInitializeS3(const S3GlobalOptions & options)115 Status DoInitializeS3(const S3GlobalOptions& options) {
116   Aws::Utils::Logging::LogLevel aws_log_level;
117 
118 #define LOG_LEVEL_CASE(level_name)                             \
119   case S3LogLevel::level_name:                                 \
120     aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
121     break;
122 
123   switch (options.log_level) {
124     LOG_LEVEL_CASE(Fatal)
125     LOG_LEVEL_CASE(Error)
126     LOG_LEVEL_CASE(Warn)
127     LOG_LEVEL_CASE(Info)
128     LOG_LEVEL_CASE(Debug)
129     LOG_LEVEL_CASE(Trace)
130     default:
131       aws_log_level = Aws::Utils::Logging::LogLevel::Off;
132   }
133 
134 #undef LOG_LEVEL_CASE
135 
136   aws_options.loggingOptions.logLevel = aws_log_level;
137   // By default the AWS SDK logs to files, log to console instead
138   aws_options.loggingOptions.logger_create_fn = [] {
139     return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
140         aws_options.loggingOptions.logLevel);
141   };
142   Aws::InitAPI(aws_options);
143   aws_initialized.store(true);
144   return Status::OK();
145 }
146 
147 }  // namespace
148 
InitializeS3(const S3GlobalOptions & options)149 Status InitializeS3(const S3GlobalOptions& options) {
150   std::lock_guard<std::mutex> lock(aws_init_lock);
151   return DoInitializeS3(options);
152 }
153 
FinalizeS3()154 Status FinalizeS3() {
155   std::lock_guard<std::mutex> lock(aws_init_lock);
156   Aws::ShutdownAPI(aws_options);
157   aws_initialized.store(false);
158   return Status::OK();
159 }
160 
EnsureS3Initialized()161 Status EnsureS3Initialized() {
162   std::lock_guard<std::mutex> lock(aws_init_lock);
163   if (!aws_initialized.load()) {
164     S3GlobalOptions options{S3LogLevel::Fatal};
165     return DoInitializeS3(options);
166   }
167   return Status::OK();
168 }
169 
170 // -----------------------------------------------------------------------
171 // S3Options implementation
172 
ConfigureDefaultCredentials()173 void S3Options::ConfigureDefaultCredentials() {
174   credentials_provider =
175       std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
176 }
177 
ConfigureAnonymousCredentials()178 void S3Options::ConfigureAnonymousCredentials() {
179   credentials_provider = std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
180 }
181 
ConfigureAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)182 void S3Options::ConfigureAccessKey(const std::string& access_key,
183                                    const std::string& secret_key,
184                                    const std::string& session_token) {
185   credentials_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
186       ToAwsString(access_key), ToAwsString(secret_key), ToAwsString(session_token));
187 }
188 
ConfigureAssumeRoleCredentials(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)189 void S3Options::ConfigureAssumeRoleCredentials(
190     const std::string& role_arn, const std::string& session_name,
191     const std::string& external_id, int load_frequency,
192     const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
193   credentials_provider = std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
194       ToAwsString(role_arn), ToAwsString(session_name), ToAwsString(external_id),
195       load_frequency, stsClient);
196 }
197 
GetAccessKey() const198 std::string S3Options::GetAccessKey() const {
199   auto credentials = credentials_provider->GetAWSCredentials();
200   return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
201 }
202 
GetSecretKey() const203 std::string S3Options::GetSecretKey() const {
204   auto credentials = credentials_provider->GetAWSCredentials();
205   return std::string(FromAwsString(credentials.GetAWSSecretKey()));
206 }
207 
GetSessionToken() const208 std::string S3Options::GetSessionToken() const {
209   auto credentials = credentials_provider->GetAWSCredentials();
210   return std::string(FromAwsString(credentials.GetSessionToken()));
211 }
212 
Defaults()213 S3Options S3Options::Defaults() {
214   S3Options options;
215   options.ConfigureDefaultCredentials();
216   return options;
217 }
218 
Anonymous()219 S3Options S3Options::Anonymous() {
220   S3Options options;
221   options.ConfigureAnonymousCredentials();
222   return options;
223 }
224 
FromAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)225 S3Options S3Options::FromAccessKey(const std::string& access_key,
226                                    const std::string& secret_key,
227                                    const std::string& session_token) {
228   S3Options options;
229   options.ConfigureAccessKey(access_key, secret_key, session_token);
230   return options;
231 }
232 
FromAssumeRole(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)233 S3Options S3Options::FromAssumeRole(
234     const std::string& role_arn, const std::string& session_name,
235     const std::string& external_id, int load_frequency,
236     const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
237   S3Options options;
238   options.role_arn = role_arn;
239   options.session_name = session_name;
240   options.external_id = external_id;
241   options.load_frequency = load_frequency;
242   options.ConfigureAssumeRoleCredentials(role_arn, session_name, external_id,
243                                          load_frequency, stsClient);
244   return options;
245 }
246 
FromUri(const Uri & uri,std::string * out_path)247 Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
248   S3Options options;
249 
250   const auto bucket = uri.host();
251   auto path = uri.path();
252   if (bucket.empty()) {
253     if (!path.empty()) {
254       return Status::Invalid("Missing bucket name in S3 URI");
255     }
256   } else {
257     if (path.empty()) {
258       path = bucket;
259     } else {
260       if (path[0] != '/') {
261         return Status::Invalid("S3 URI should absolute, not relative");
262       }
263       path = bucket + path;
264     }
265   }
266   if (out_path != nullptr) {
267     *out_path = std::string(internal::RemoveTrailingSlash(path));
268   }
269 
270   std::unordered_map<std::string, std::string> options_map;
271   ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
272   for (const auto& kv : options_items) {
273     options_map.emplace(kv.first, kv.second);
274   }
275 
276   const auto username = uri.username();
277   if (!username.empty()) {
278     options.ConfigureAccessKey(username, uri.password());
279   } else {
280     options.ConfigureDefaultCredentials();
281   }
282 
283   bool region_set = false;
284   for (const auto& kv : options_map) {
285     if (kv.first == "region") {
286       options.region = kv.second;
287       region_set = true;
288     } else if (kv.first == "scheme") {
289       options.scheme = kv.second;
290     } else if (kv.first == "endpoint_override") {
291       options.endpoint_override = kv.second;
292     } else {
293       return Status::Invalid("Unexpected query parameter in S3 URI: '", kv.first, "'");
294     }
295   }
296 
297   if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
298     // XXX Should we use a dedicated resolver with the given credentials?
299     ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
300   }
301 
302   return options;
303 }
304 
FromUri(const std::string & uri_string,std::string * out_path)305 Result<S3Options> S3Options::FromUri(const std::string& uri_string,
306                                      std::string* out_path) {
307   Uri uri;
308   RETURN_NOT_OK(uri.Parse(uri_string));
309   return FromUri(uri, out_path);
310 }
311 
Equals(const S3Options & other) const312 bool S3Options::Equals(const S3Options& other) const {
313   return (region == other.region && endpoint_override == other.endpoint_override &&
314           scheme == other.scheme && background_writes == other.background_writes &&
315           GetAccessKey() == other.GetAccessKey() &&
316           GetSecretKey() == other.GetSecretKey() &&
317           GetSessionToken() == other.GetSessionToken());
318 }
319 
320 namespace {
321 
CheckS3Initialized()322 Status CheckS3Initialized() {
323   if (!aws_initialized.load()) {
324     return Status::Invalid(
325         "S3 subsystem not initialized; please call InitializeS3() "
326         "before carrying out any S3-related operation");
327   }
328   return Status::OK();
329 }
330 
331 // XXX Sanitize paths by removing leading slash?
332 
333 struct S3Path {
334   std::string full_path;
335   std::string bucket;
336   std::string key;
337   std::vector<std::string> key_parts;
338 
FromStringarrow::fs::__anoneb4239ef0311::S3Path339   static Result<S3Path> FromString(const std::string& s) {
340     const auto src = internal::RemoveTrailingSlash(s);
341     auto first_sep = src.find_first_of(kSep);
342     if (first_sep == 0) {
343       return Status::Invalid("Path cannot start with a separator ('", s, "')");
344     }
345     if (first_sep == std::string::npos) {
346       return S3Path{std::string(src), std::string(src), "", {}};
347     }
348     S3Path path;
349     path.full_path = std::string(src);
350     path.bucket = std::string(src.substr(0, first_sep));
351     path.key = std::string(src.substr(first_sep + 1));
352     path.key_parts = internal::SplitAbstractPath(path.key);
353     RETURN_NOT_OK(Validate(&path));
354     return path;
355   }
356 
Validatearrow::fs::__anoneb4239ef0311::S3Path357   static Status Validate(const S3Path* path) {
358     auto result = internal::ValidateAbstractPathParts(path->key_parts);
359     if (!result.ok()) {
360       return Status::Invalid(result.message(), " in path ", path->full_path);
361     } else {
362       return result;
363     }
364   }
365 
ToURLEncodedAwsStringarrow::fs::__anoneb4239ef0311::S3Path366   Aws::String ToURLEncodedAwsString() const {
367     // URL-encode individual parts, not the '/' separator
368     Aws::String res;
369     res += internal::ToURLEncodedAwsString(bucket);
370     for (const auto& part : key_parts) {
371       res += kSep;
372       res += internal::ToURLEncodedAwsString(part);
373     }
374     return res;
375   }
376 
parentarrow::fs::__anoneb4239ef0311::S3Path377   S3Path parent() const {
378     DCHECK(!key_parts.empty());
379     auto parent = S3Path{"", bucket, "", key_parts};
380     parent.key_parts.pop_back();
381     parent.key = internal::JoinAbstractPath(parent.key_parts);
382     parent.full_path = parent.bucket + kSep + parent.key;
383     return parent;
384   }
385 
has_parentarrow::fs::__anoneb4239ef0311::S3Path386   bool has_parent() const { return !key.empty(); }
387 
emptyarrow::fs::__anoneb4239ef0311::S3Path388   bool empty() const { return bucket.empty() && key.empty(); }
389 
operator ==arrow::fs::__anoneb4239ef0311::S3Path390   bool operator==(const S3Path& other) const {
391     return bucket == other.bucket && key == other.key;
392   }
393 };
394 
395 // XXX return in OutcomeToStatus instead?
PathNotFound(const S3Path & path)396 Status PathNotFound(const S3Path& path) {
397   return ::arrow::fs::internal::PathNotFound(path.full_path);
398 }
399 
PathNotFound(const std::string & bucket,const std::string & key)400 Status PathNotFound(const std::string& bucket, const std::string& key) {
401   return ::arrow::fs::internal::PathNotFound(bucket + kSep + key);
402 }
403 
NotAFile(const S3Path & path)404 Status NotAFile(const S3Path& path) {
405   return ::arrow::fs::internal::NotAFile(path.full_path);
406 }
407 
ValidateFilePath(const S3Path & path)408 Status ValidateFilePath(const S3Path& path) {
409   if (path.bucket.empty() || path.key.empty()) {
410     return NotAFile(path);
411   }
412   return Status::OK();
413 }
414 
FormatRange(int64_t start,int64_t length)415 std::string FormatRange(int64_t start, int64_t length) {
416   // Format a HTTP range header value
417   std::stringstream ss;
418   ss << "bytes=" << start << "-" << start + length - 1;
419   return ss.str();
420 }
421 
422 class S3Client : public Aws::S3::S3Client {
423  public:
424   using Aws::S3::S3Client::S3Client;
425 
426   // To get a bucket's region, we must extract the "x-amz-bucket-region" header
427   // from the response to a HEAD bucket request.
428   // Unfortunately, the S3Client APIs don't let us access the headers of successful
429   // responses.  So we have to cook a AWS request and issue it ourselves.
430 
GetBucketRegion(const S3Model::HeadBucketRequest & request)431   Result<std::string> GetBucketRegion(const S3Model::HeadBucketRequest& request) {
432     auto uri = GeneratePresignedUrl(request.GetBucket(),
433                                     /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD);
434     // NOTE: The signer region argument isn't passed here, as there's no easy
435     // way of computing it (the relevant method is private).
436     auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD,
437                                Aws::Auth::SIGV4_SIGNER);
438     const auto code = outcome.IsSuccess() ? outcome.GetResult().GetResponseCode()
439                                           : outcome.GetError().GetResponseCode();
440     const auto& headers = outcome.IsSuccess()
441                               ? outcome.GetResult().GetHeaderValueCollection()
442                               : outcome.GetError().GetResponseHeaders();
443 
444     const auto it = headers.find(ToAwsString("x-amz-bucket-region"));
445     if (it == headers.end()) {
446       if (code == Aws::Http::HttpResponseCode::NOT_FOUND) {
447         return Status::IOError("Bucket '", request.GetBucket(), "' not found");
448       } else if (!outcome.IsSuccess()) {
449         return ErrorToStatus(std::forward_as_tuple("When resolving region for bucket '",
450                                                    request.GetBucket(), "': "),
451                              outcome.GetError());
452       } else {
453         return Status::IOError("When resolving region for bucket '", request.GetBucket(),
454                                "': missing 'x-amz-bucket-region' header in response");
455       }
456     }
457     return std::string(FromAwsString(it->second));
458   }
459 
GetBucketRegion(const std::string & bucket)460   Result<std::string> GetBucketRegion(const std::string& bucket) {
461     S3Model::HeadBucketRequest req;
462     req.SetBucket(ToAwsString(bucket));
463     return GetBucketRegion(req);
464   }
465 };
466 
467 // In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a bool.
468 template <bool Never = false>
DisableRedirectsImpl(bool * followRedirects)469 void DisableRedirectsImpl(bool* followRedirects) {
470   *followRedirects = false;
471 }
472 
473 // In AWS SDK >= 1.8, it's a Aws::Client::FollowRedirectsPolicy scoped enum.
474 template <typename PolicyEnum, PolicyEnum Never = PolicyEnum::NEVER>
DisableRedirectsImpl(PolicyEnum * followRedirects)475 void DisableRedirectsImpl(PolicyEnum* followRedirects) {
476   *followRedirects = Never;
477 }
478 
DisableRedirects(Aws::Client::ClientConfiguration * c)479 void DisableRedirects(Aws::Client::ClientConfiguration* c) {
480   DisableRedirectsImpl(&c->followRedirects);
481 }
482 
483 class ClientBuilder {
484  public:
ClientBuilder(S3Options options)485   explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
486 
config() const487   const Aws::Client::ClientConfiguration& config() const { return client_config_; }
488 
mutable_config()489   Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; }
490 
BuildClient()491   Result<std::unique_ptr<S3Client>> BuildClient() {
492     credentials_provider_ = options_.credentials_provider;
493     if (!options_.region.empty()) {
494       client_config_.region = ToAwsString(options_.region);
495     }
496     client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
497     if (options_.scheme == "http") {
498       client_config_.scheme = Aws::Http::Scheme::HTTP;
499     } else if (options_.scheme == "https") {
500       client_config_.scheme = Aws::Http::Scheme::HTTPS;
501     } else {
502       return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
503     }
504     client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
505     if (!internal::global_options.tls_ca_file_path.empty()) {
506       client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
507     }
508     if (!internal::global_options.tls_ca_dir_path.empty()) {
509       client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
510     }
511 
512     const bool use_virtual_addressing = options_.endpoint_override.empty();
513     return std::unique_ptr<S3Client>(
514         new S3Client(credentials_provider_, client_config_,
515                      Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
516                      use_virtual_addressing));
517   }
518 
options() const519   const S3Options& options() const { return options_; }
520 
521  protected:
522   S3Options options_;
523   Aws::Client::ClientConfiguration client_config_;
524   std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
525 };
526 
527 // -----------------------------------------------------------------------
528 // S3 region resolver
529 
530 class RegionResolver {
531  public:
Make(S3Options options)532   static Result<std::shared_ptr<RegionResolver>> Make(S3Options options) {
533     std::shared_ptr<RegionResolver> resolver(new RegionResolver(std::move(options)));
534     RETURN_NOT_OK(resolver->Init());
535     return resolver;
536   }
537 
DefaultInstance()538   static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
539     static std::shared_ptr<RegionResolver> instance;
540     auto resolver = arrow::internal::atomic_load(&instance);
541     if (resolver) {
542       return resolver;
543     }
544     auto maybe_resolver = Make(S3Options::Anonymous());
545     if (!maybe_resolver.ok()) {
546       return maybe_resolver;
547     }
548     // Make sure to always return the same instance even if several threads
549     // call DefaultInstance at once.
550     std::shared_ptr<RegionResolver> existing;
551     if (arrow::internal::atomic_compare_exchange_strong(&instance, &existing,
552                                                         *maybe_resolver)) {
553       return *maybe_resolver;
554     } else {
555       return existing;
556     }
557   }
558 
ResolveRegion(const std::string & bucket)559   Result<std::string> ResolveRegion(const std::string& bucket) {
560     std::unique_lock<std::mutex> lock(cache_mutex_);
561     auto it = cache_.find(bucket);
562     if (it != cache_.end()) {
563       return it->second;
564     }
565     lock.unlock();
566     ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
567     lock.lock();
568     // Note we don't cache a non-existent bucket, as the bucket could be created later
569     cache_[bucket] = region;
570     return region;
571   }
572 
ResolveRegionUncached(const std::string & bucket)573   Result<std::string> ResolveRegionUncached(const std::string& bucket) {
574     return client_->GetBucketRegion(bucket);
575   }
576 
577  protected:
RegionResolver(S3Options options)578   explicit RegionResolver(S3Options options) : builder_(std::move(options)) {}
579 
Init()580   Status Init() {
581     DCHECK(builder_.options().endpoint_override.empty());
582     // On Windows with AWS SDK >= 1.8, it is necessary to disable redirects (ARROW-10085).
583     DisableRedirects(builder_.mutable_config());
584     return builder_.BuildClient().Value(&client_);
585   }
586 
587   ClientBuilder builder_;
588   std::unique_ptr<S3Client> client_;
589 
590   std::mutex cache_mutex_;
591   // XXX Should cache size be bounded?  It must be quite unusual to query millions
592   // of different buckets in a single program invocation...
593   std::unordered_map<std::string, std::string> cache_;
594 };
595 
596 // -----------------------------------------------------------------------
597 // S3 file stream implementations
598 
599 // A non-copying iostream.
600 // See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
601 // https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
602 class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
603  public:
StringViewStream(const void * data,int64_t nbytes)604   StringViewStream(const void* data, int64_t nbytes)
605       : Aws::Utils::Stream::PreallocatedStreamBuf(
606             reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
607             static_cast<size_t>(nbytes)),
608         std::iostream(this) {}
609 };
610 
611 // By default, the AWS SDK reads object data into an auto-growing StringStream.
612 // To avoid copies, read directly into our preallocated buffer instead.
613 // See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
614 // functionally similar recipe.
AwsWriteableStreamFactory(void * data,int64_t nbytes)615 Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
616   return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
617 }
618 
GetObjectRange(Aws::S3::S3Client * client,const S3Path & path,int64_t start,int64_t length,void * out)619 Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
620                                                 const S3Path& path, int64_t start,
621                                                 int64_t length, void* out) {
622   S3Model::GetObjectRequest req;
623   req.SetBucket(ToAwsString(path.bucket));
624   req.SetKey(ToAwsString(path.key));
625   req.SetRange(ToAwsString(FormatRange(start, length)));
626   req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
627   return OutcomeToResult(client->GetObject(req));
628 }
629 
630 // A RandomAccessFile that reads from a S3 object
631 class ObjectInputFile final : public io::RandomAccessFile {
632  public:
ObjectInputFile(std::shared_ptr<FileSystem> fs,Aws::S3::S3Client * client,const S3Path & path,int64_t size=kNoSize)633   ObjectInputFile(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
634                   const S3Path& path, int64_t size = kNoSize)
635       : fs_(std::move(fs)), client_(client), path_(path), content_length_(size) {}
636 
Init()637   Status Init() {
638     // Issue a HEAD Object to get the content-length and ensure any
639     // errors (e.g. file not found) don't wait until the first Read() call.
640     if (content_length_ != kNoSize) {
641       DCHECK_GE(content_length_, 0);
642       return Status::OK();
643     }
644 
645     S3Model::HeadObjectRequest req;
646     req.SetBucket(ToAwsString(path_.bucket));
647     req.SetKey(ToAwsString(path_.key));
648 
649     auto outcome = client_->HeadObject(req);
650     if (!outcome.IsSuccess()) {
651       if (IsNotFound(outcome.GetError())) {
652         return PathNotFound(path_);
653       } else {
654         return ErrorToStatus(
655             std::forward_as_tuple("When reading information for key '", path_.key,
656                                   "' in bucket '", path_.bucket, "': "),
657             outcome.GetError());
658       }
659     }
660     content_length_ = outcome.GetResult().GetContentLength();
661     DCHECK_GE(content_length_, 0);
662     return Status::OK();
663   }
664 
CheckClosed() const665   Status CheckClosed() const {
666     if (closed_) {
667       return Status::Invalid("Operation on closed stream");
668     }
669     return Status::OK();
670   }
671 
CheckPosition(int64_t position,const char * action) const672   Status CheckPosition(int64_t position, const char* action) const {
673     if (position < 0) {
674       return Status::Invalid("Cannot ", action, " from negative position");
675     }
676     if (position > content_length_) {
677       return Status::IOError("Cannot ", action, " past end of file");
678     }
679     return Status::OK();
680   }
681 
682   // RandomAccessFile APIs
683 
Close()684   Status Close() override {
685     fs_.reset();
686     client_ = nullptr;
687     closed_ = true;
688     return Status::OK();
689   }
690 
closed() const691   bool closed() const override { return closed_; }
692 
Tell() const693   Result<int64_t> Tell() const override {
694     RETURN_NOT_OK(CheckClosed());
695     return pos_;
696   }
697 
GetSize()698   Result<int64_t> GetSize() override {
699     RETURN_NOT_OK(CheckClosed());
700     return content_length_;
701   }
702 
Seek(int64_t position)703   Status Seek(int64_t position) override {
704     RETURN_NOT_OK(CheckClosed());
705     RETURN_NOT_OK(CheckPosition(position, "seek"));
706 
707     pos_ = position;
708     return Status::OK();
709   }
710 
ReadAt(int64_t position,int64_t nbytes,void * out)711   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
712     RETURN_NOT_OK(CheckClosed());
713     RETURN_NOT_OK(CheckPosition(position, "read"));
714 
715     nbytes = std::min(nbytes, content_length_ - position);
716     if (nbytes == 0) {
717       return 0;
718     }
719 
720     // Read the desired range of bytes
721     ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
722                           GetObjectRange(client_, path_, position, nbytes, out));
723 
724     auto& stream = result.GetBody();
725     stream.ignore(nbytes);
726     // NOTE: the stream is a stringstream by default, there is no actual error
727     // to check for.  However, stream.fail() may return true if EOF is reached.
728     return stream.gcount();
729   }
730 
ReadAt(int64_t position,int64_t nbytes)731   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
732     RETURN_NOT_OK(CheckClosed());
733     RETURN_NOT_OK(CheckPosition(position, "read"));
734 
735     // No need to allocate more than the remaining number of bytes
736     nbytes = std::min(nbytes, content_length_ - position);
737 
738     ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes));
739     if (nbytes > 0) {
740       ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
741                             ReadAt(position, nbytes, buf->mutable_data()));
742       DCHECK_LE(bytes_read, nbytes);
743       RETURN_NOT_OK(buf->Resize(bytes_read));
744     }
745     return std::move(buf);
746   }
747 
Read(int64_t nbytes,void * out)748   Result<int64_t> Read(int64_t nbytes, void* out) override {
749     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
750     pos_ += bytes_read;
751     return bytes_read;
752   }
753 
Read(int64_t nbytes)754   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
755     ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
756     pos_ += buffer->size();
757     return std::move(buffer);
758   }
759 
760  protected:
761   std::shared_ptr<FileSystem> fs_;  // Owner of S3Client
762   Aws::S3::S3Client* client_;
763   S3Path path_;
764   bool closed_ = false;
765   int64_t pos_ = 0;
766   int64_t content_length_ = kNoSize;
767 };
768 
769 // Minimum size for each part of a multipart upload, except for the last part.
770 // AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
771 // so I chose the safer value.
772 // (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
773 static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
774 
775 // An OutputStream that writes to a S3 object
776 class ObjectOutputStream final : public io::OutputStream {
777  protected:
778   struct UploadState;
779 
780  public:
ObjectOutputStream(std::shared_ptr<FileSystem> fs,Aws::S3::S3Client * client,const S3Path & path,const S3Options & options)781   ObjectOutputStream(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
782                      const S3Path& path, const S3Options& options)
783       : fs_(std::move(fs)), client_(client), path_(path), options_(options) {}
784 
~ObjectOutputStream()785   ~ObjectOutputStream() override {
786     // For compliance with the rest of the IO stack, Close rather than Abort,
787     // even though it may be more expensive.
788     io::internal::CloseFromDestructor(this);
789   }
790 
Init()791   Status Init() {
792     // Initiate the multi-part upload
793     S3Model::CreateMultipartUploadRequest req;
794     req.SetBucket(ToAwsString(path_.bucket));
795     req.SetKey(ToAwsString(path_.key));
796 
797     auto outcome = client_->CreateMultipartUpload(req);
798     if (!outcome.IsSuccess()) {
799       return ErrorToStatus(
800           std::forward_as_tuple("When initiating multiple part upload for key '",
801                                 path_.key, "' in bucket '", path_.bucket, "': "),
802           outcome.GetError());
803     }
804     upload_id_ = outcome.GetResult().GetUploadId();
805     upload_state_ = std::make_shared<UploadState>();
806     closed_ = false;
807     return Status::OK();
808   }
809 
Abort()810   Status Abort() override {
811     if (closed_) {
812       return Status::OK();
813     }
814 
815     S3Model::AbortMultipartUploadRequest req;
816     req.SetBucket(ToAwsString(path_.bucket));
817     req.SetKey(ToAwsString(path_.key));
818     req.SetUploadId(upload_id_);
819 
820     auto outcome = client_->AbortMultipartUpload(req);
821     if (!outcome.IsSuccess()) {
822       return ErrorToStatus(
823           std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
824                                 "' in bucket '", path_.bucket, "': "),
825           outcome.GetError());
826     }
827     current_part_.reset();
828     fs_.reset();
829     client_ = nullptr;
830     closed_ = true;
831     return Status::OK();
832   }
833 
834   // OutputStream interface
835 
Close()836   Status Close() override {
837     if (closed_) {
838       return Status::OK();
839     }
840 
841     if (current_part_) {
842       // Upload last part
843       RETURN_NOT_OK(CommitCurrentPart());
844     }
845 
846     // S3 mandates at least one part, upload an empty one if necessary
847     if (part_number_ == 1) {
848       RETURN_NOT_OK(UploadPart("", 0));
849     }
850 
851     // Wait for in-progress uploads to finish (if async writes are enabled)
852     RETURN_NOT_OK(Flush());
853 
854     // At this point, all part uploads have finished successfully
855     DCHECK_GT(part_number_, 1);
856     DCHECK_EQ(upload_state_->completed_parts.size(),
857               static_cast<size_t>(part_number_ - 1));
858 
859     S3Model::CompletedMultipartUpload completed_upload;
860     completed_upload.SetParts(upload_state_->completed_parts);
861     S3Model::CompleteMultipartUploadRequest req;
862     req.SetBucket(ToAwsString(path_.bucket));
863     req.SetKey(ToAwsString(path_.key));
864     req.SetUploadId(upload_id_);
865     req.SetMultipartUpload(std::move(completed_upload));
866 
867     auto outcome = client_->CompleteMultipartUpload(req);
868     if (!outcome.IsSuccess()) {
869       return ErrorToStatus(
870           std::forward_as_tuple("When completing multiple part upload for key '",
871                                 path_.key, "' in bucket '", path_.bucket, "': "),
872           outcome.GetError());
873     }
874 
875     fs_.reset();
876     client_ = nullptr;
877     closed_ = true;
878     return Status::OK();
879   }
880 
closed() const881   bool closed() const override { return closed_; }
882 
Tell() const883   Result<int64_t> Tell() const override {
884     if (closed_) {
885       return Status::Invalid("Operation on closed stream");
886     }
887     return pos_;
888   }
889 
Write(const std::shared_ptr<Buffer> & buffer)890   Status Write(const std::shared_ptr<Buffer>& buffer) override {
891     return DoWrite(buffer->data(), buffer->size(), buffer);
892   }
893 
Write(const void * data,int64_t nbytes)894   Status Write(const void* data, int64_t nbytes) override {
895     return DoWrite(data, nbytes);
896   }
897 
DoWrite(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)898   Status DoWrite(const void* data, int64_t nbytes,
899                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
900     if (closed_) {
901       return Status::Invalid("Operation on closed stream");
902     }
903 
904     if (!current_part_ && nbytes >= part_upload_threshold_) {
905       // No current part and data large enough, upload it directly
906       // (without copying if the buffer is owned)
907       RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
908       pos_ += nbytes;
909       return Status::OK();
910     }
911     // Can't upload data on its own, need to buffer it
912     if (!current_part_) {
913       ARROW_ASSIGN_OR_RAISE(current_part_,
914                             io::BufferOutputStream::Create(part_upload_threshold_));
915       current_part_size_ = 0;
916     }
917     RETURN_NOT_OK(current_part_->Write(data, nbytes));
918     pos_ += nbytes;
919     current_part_size_ += nbytes;
920 
921     if (current_part_size_ >= part_upload_threshold_) {
922       // Current part large enough, upload it
923       RETURN_NOT_OK(CommitCurrentPart());
924     }
925 
926     return Status::OK();
927   }
928 
Flush()929   Status Flush() override {
930     if (closed_) {
931       return Status::Invalid("Operation on closed stream");
932     }
933     // Wait for background writes to finish
934     std::unique_lock<std::mutex> lock(upload_state_->mutex);
935     upload_state_->cv.wait(lock,
936                            [this]() { return upload_state_->parts_in_progress == 0; });
937     return upload_state_->status;
938   }
939 
940   // Upload-related helpers
941 
CommitCurrentPart()942   Status CommitCurrentPart() {
943     ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
944     current_part_.reset();
945     current_part_size_ = 0;
946     return UploadPart(buf);
947   }
948 
UploadPart(std::shared_ptr<Buffer> buffer)949   Status UploadPart(std::shared_ptr<Buffer> buffer) {
950     return UploadPart(buffer->data(), buffer->size(), buffer);
951   }
952 
UploadPart(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)953   Status UploadPart(const void* data, int64_t nbytes,
954                     std::shared_ptr<Buffer> owned_buffer = nullptr) {
955     S3Model::UploadPartRequest req;
956     req.SetBucket(ToAwsString(path_.bucket));
957     req.SetKey(ToAwsString(path_.key));
958     req.SetUploadId(upload_id_);
959     req.SetPartNumber(part_number_);
960     req.SetContentLength(nbytes);
961 
962     if (!options_.background_writes) {
963       req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
964       auto outcome = client_->UploadPart(req);
965       if (!outcome.IsSuccess()) {
966         return UploadPartError(req, outcome);
967       } else {
968         AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
969       }
970     } else {
971       std::unique_lock<std::mutex> lock(upload_state_->mutex);
972       auto state = upload_state_;  // Keep upload state alive in closure
973       auto part_number = part_number_;
974 
975       // If the data isn't owned, make an immutable copy for the lifetime of the closure
976       if (owned_buffer == nullptr) {
977         ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes));
978         memcpy(owned_buffer->mutable_data(), data, nbytes);
979       } else {
980         DCHECK_EQ(data, owned_buffer->data());
981         DCHECK_EQ(nbytes, owned_buffer->size());
982       }
983       req.SetBody(
984           std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
985 
986       auto handler =
987           [state, owned_buffer, part_number](
988               const Aws::S3::S3Client*, const S3Model::UploadPartRequest& req,
989               const S3Model::UploadPartOutcome& outcome,
990               const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) -> void {
991         std::unique_lock<std::mutex> lock(state->mutex);
992         if (!outcome.IsSuccess()) {
993           state->status &= UploadPartError(req, outcome);
994         } else {
995           AddCompletedPart(state, part_number, outcome.GetResult());
996         }
997         // Notify completion, regardless of success / error status
998         if (--state->parts_in_progress == 0) {
999           state->cv.notify_all();
1000         }
1001       };
1002       ++upload_state_->parts_in_progress;
1003       client_->UploadPartAsync(req, handler);
1004     }
1005 
1006     ++part_number_;
1007     // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
1008     // of exactly 5MB would be limited to 50GB total.  To avoid that, we bump
1009     // the upload threshold every 100 parts.  So the pattern is:
1010     // - part 1 to 99: 5MB threshold
1011     // - part 100 to 199: 10MB threshold
1012     // - part 200 to 299: 15MB threshold
1013     // ...
1014     // - part 9900 to 9999: 500MB threshold
1015     // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
1016     // chunk sizes and avoiding too much buffering in the common case of a small-ish
1017     // stream.  If the limit's not enough, we can revisit.
1018     if (part_number_ % 100 == 0) {
1019       part_upload_threshold_ += kMinimumPartUpload;
1020     }
1021 
1022     return Status::OK();
1023   }
1024 
AddCompletedPart(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartResult & result)1025   static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number,
1026                                const S3Model::UploadPartResult& result) {
1027     S3Model::CompletedPart part;
1028     // Append ETag and part number for this uploaded part
1029     // (will be needed for upload completion in Close())
1030     part.SetPartNumber(part_number);
1031     part.SetETag(result.GetETag());
1032     int slot = part_number - 1;
1033     if (state->completed_parts.size() <= static_cast<size_t>(slot)) {
1034       state->completed_parts.resize(slot + 1);
1035     }
1036     DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet());
1037     state->completed_parts[slot] = std::move(part);
1038   }
1039 
UploadPartError(const S3Model::UploadPartRequest & req,const S3Model::UploadPartOutcome & outcome)1040   static Status UploadPartError(const S3Model::UploadPartRequest& req,
1041                                 const S3Model::UploadPartOutcome& outcome) {
1042     return ErrorToStatus(
1043         std::forward_as_tuple("When uploading part for key '", req.GetKey(),
1044                               "' in bucket '", req.GetBucket(), "': "),
1045         outcome.GetError());
1046   }
1047 
1048  protected:
1049   std::shared_ptr<FileSystem> fs_;  // Owner of S3Client
1050   Aws::S3::S3Client* client_;
1051   S3Path path_;
1052   const S3Options& options_;
1053   Aws::String upload_id_;
1054   bool closed_ = true;
1055   int64_t pos_ = 0;
1056   int32_t part_number_ = 1;
1057   std::shared_ptr<io::BufferOutputStream> current_part_;
1058   int64_t current_part_size_ = 0;
1059   int64_t part_upload_threshold_ = kMinimumPartUpload;
1060 
1061   // This struct is kept alive through background writes to avoid problems
1062   // in the completion handler.
1063   struct UploadState {
1064     std::mutex mutex;
1065     std::condition_variable cv;
1066     Aws::Vector<S3Model::CompletedPart> completed_parts;
1067     int64_t parts_in_progress = 0;
1068     Status status;
1069 
UploadStatearrow::fs::__anoneb4239ef0311::ObjectOutputStream::UploadState1070     UploadState() : status(Status::OK()) {}
1071   };
1072   std::shared_ptr<UploadState> upload_state_;
1073 };
1074 
1075 // This function assumes info->path() is already set
FileObjectToInfo(const S3Model::HeadObjectResult & obj,FileInfo * info)1076 void FileObjectToInfo(const S3Model::HeadObjectResult& obj, FileInfo* info) {
1077   info->set_type(FileType::File);
1078   info->set_size(static_cast<int64_t>(obj.GetContentLength()));
1079   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1080 }
1081 
FileObjectToInfo(const S3Model::Object & obj,FileInfo * info)1082 void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
1083   info->set_type(FileType::File);
1084   info->set_size(static_cast<int64_t>(obj.GetSize()));
1085   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1086 }
1087 
1088 struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
1089   using ResultHandler = std::function<Status(const std::string& prefix,
1090                                              const S3Model::ListObjectsV2Result&)>;
1091   using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
1092   using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
1093 
1094   Aws::S3::S3Client* client_;
1095   const std::string bucket_;
1096   const std::string base_dir_;
1097   const int32_t max_keys_;
1098   const ResultHandler result_handler_;
1099   const ErrorHandler error_handler_;
1100   const RecursionHandler recursion_handler_;
1101 
1102   template <typename... Args>
Walkarrow::fs::__anoneb4239ef0311::TreeWalker1103   static Status Walk(Args&&... args) {
1104     auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
1105     return self->DoWalk();
1106   }
1107 
TreeWalkerarrow::fs::__anoneb4239ef0311::TreeWalker1108   TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir,
1109              int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler,
1110              RecursionHandler recursion_handler)
1111       : client_(std::move(client)),
1112         bucket_(std::move(bucket)),
1113         base_dir_(std::move(base_dir)),
1114         max_keys_(max_keys),
1115         result_handler_(std::move(result_handler)),
1116         error_handler_(std::move(error_handler)),
1117         recursion_handler_(std::move(recursion_handler)) {}
1118 
1119  private:
1120   std::mutex mutex_;
1121   Future<> future_;
1122   std::atomic<int32_t> num_in_flight_;
1123 
DoWalkarrow::fs::__anoneb4239ef0311::TreeWalker1124   Status DoWalk() {
1125     future_ = decltype(future_)::Make();
1126     num_in_flight_ = 0;
1127     WalkChild(base_dir_, /*nesting_depth=*/0);
1128     // When this returns, ListObjectsV2 tasks either have finished or will exit early
1129     return future_.status();
1130   }
1131 
is_finishedarrow::fs::__anoneb4239ef0311::TreeWalker1132   bool is_finished() const { return future_.is_finished(); }
1133 
ListObjectsFinishedarrow::fs::__anoneb4239ef0311::TreeWalker1134   void ListObjectsFinished(Status st) {
1135     const auto in_flight = --num_in_flight_;
1136     if (!st.ok() || !in_flight) {
1137       future_.MarkFinished(std::move(st));
1138     }
1139   }
1140 
1141   struct ListObjectsV2Handler {
1142     std::shared_ptr<TreeWalker> walker;
1143     std::string prefix;
1144     int32_t nesting_depth;
1145     S3Model::ListObjectsV2Request req;
1146 
operator ()arrow::fs::__anoneb4239ef0311::TreeWalker::ListObjectsV2Handler1147     void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&,
1148                     const S3Model::ListObjectsV2Outcome& outcome,
1149                     const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
1150       // Serialize calls to operation-specific handlers
1151       std::unique_lock<std::mutex> guard(walker->mutex_);
1152       if (walker->is_finished()) {
1153         // Early exit: avoid executing handlers if DoWalk() returned
1154         return;
1155       }
1156       if (!outcome.IsSuccess()) {
1157         Status st = walker->error_handler_(outcome.GetError());
1158         walker->ListObjectsFinished(std::move(st));
1159         return;
1160       }
1161       HandleResult(outcome.GetResult());
1162     }
1163 
HandleResultarrow::fs::__anoneb4239ef0311::TreeWalker::ListObjectsV2Handler1164     void HandleResult(const S3Model::ListObjectsV2Result& result) {
1165       bool recurse = result.GetCommonPrefixes().size() > 0;
1166       if (recurse) {
1167         auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
1168         if (!maybe_recurse.ok()) {
1169           walker->ListObjectsFinished(maybe_recurse.status());
1170           return;
1171         }
1172         recurse &= *maybe_recurse;
1173       }
1174       Status st = walker->result_handler_(prefix, result);
1175       if (!st.ok()) {
1176         walker->ListObjectsFinished(std::move(st));
1177         return;
1178       }
1179       if (recurse) {
1180         walker->WalkChildren(result, nesting_depth + 1);
1181       }
1182       // If the result was truncated, issue a continuation request to get
1183       // further directory entries.
1184       if (result.GetIsTruncated()) {
1185         DCHECK(!result.GetNextContinuationToken().empty());
1186         req.SetContinuationToken(result.GetNextContinuationToken());
1187         walker->client_->ListObjectsV2Async(req, *this);
1188       } else {
1189         walker->ListObjectsFinished(Status::OK());
1190       }
1191     }
1192 
Startarrow::fs::__anoneb4239ef0311::TreeWalker::ListObjectsV2Handler1193     void Start() {
1194       req.SetBucket(ToAwsString(walker->bucket_));
1195       if (!prefix.empty()) {
1196         req.SetPrefix(ToAwsString(prefix) + kSep);
1197       }
1198       req.SetDelimiter(Aws::String() + kSep);
1199       req.SetMaxKeys(walker->max_keys_);
1200       walker->client_->ListObjectsV2Async(req, *this);
1201     }
1202   };
1203 
WalkChildarrow::fs::__anoneb4239ef0311::TreeWalker1204   void WalkChild(std::string key, int32_t nesting_depth) {
1205     ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
1206     ++num_in_flight_;
1207     handler.Start();
1208   }
1209 
WalkChildrenarrow::fs::__anoneb4239ef0311::TreeWalker1210   void WalkChildren(const S3Model::ListObjectsV2Result& result, int32_t nesting_depth) {
1211     for (const auto& prefix : result.GetCommonPrefixes()) {
1212       const auto child_key =
1213           internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1214       WalkChild(std::string{child_key}, nesting_depth);
1215     }
1216   }
1217 
1218   friend struct ListObjectsV2Handler;
1219 };
1220 
1221 }  // namespace
1222 
1223 // -----------------------------------------------------------------------
1224 // S3 filesystem implementation
1225 
1226 class S3FileSystem::Impl {
1227  public:
1228   ClientBuilder builder_;
1229   std::unique_ptr<Aws::S3::S3Client> client_;
1230   util::optional<S3Backend> backend_;
1231 
1232   const int32_t kListObjectsMaxKeys = 1000;
1233   // At most 1000 keys per multiple-delete request
1234   const int32_t kMultipleDeleteMaxKeys = 1000;
1235   // Limit recursing depth, since a recursion bomb can be created
1236   const int32_t kMaxNestingDepth = 100;
1237 
Impl(S3Options options)1238   explicit Impl(S3Options options) : builder_(std::move(options)) {}
1239 
Init()1240   Status Init() { return builder_.BuildClient().Value(&client_); }
1241 
options() const1242   const S3Options& options() const { return builder_.options(); }
1243 
region() const1244   std::string region() const {
1245     return std::string(FromAwsString(builder_.config().region));
1246   }
1247 
1248   template <typename Error>
SaveBackend(const Aws::Client::AWSError<Error> & error)1249   void SaveBackend(const Aws::Client::AWSError<Error>& error) {
1250     if (!backend_ || *backend_ == S3Backend::Other) {
1251       backend_ = DetectS3Backend(error);
1252     }
1253   }
1254 
1255   // Create a bucket.  Successful if bucket already exists.
CreateBucket(const std::string & bucket)1256   Status CreateBucket(const std::string& bucket) {
1257     S3Model::CreateBucketConfiguration config;
1258     S3Model::CreateBucketRequest req;
1259     config.SetLocationConstraint(
1260         S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
1261             ToAwsString(options().region)));
1262     req.SetBucket(ToAwsString(bucket));
1263     req.SetCreateBucketConfiguration(config);
1264 
1265     auto outcome = client_->CreateBucket(req);
1266     if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
1267       return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
1268                            outcome.GetError());
1269     }
1270     return Status::OK();
1271   }
1272 
1273   // Create an object with empty contents.  Successful if object already exists.
CreateEmptyObject(const std::string & bucket,const std::string & key)1274   Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
1275     S3Model::PutObjectRequest req;
1276     req.SetBucket(ToAwsString(bucket));
1277     req.SetKey(ToAwsString(key));
1278     return OutcomeToStatus(
1279         std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
1280         client_->PutObject(req));
1281   }
1282 
CreateEmptyDir(const std::string & bucket,const std::string & key)1283   Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
1284     DCHECK(!key.empty());
1285     return CreateEmptyObject(bucket, key + kSep);
1286   }
1287 
DeleteObject(const std::string & bucket,const std::string & key)1288   Status DeleteObject(const std::string& bucket, const std::string& key) {
1289     S3Model::DeleteObjectRequest req;
1290     req.SetBucket(ToAwsString(bucket));
1291     req.SetKey(ToAwsString(key));
1292     return OutcomeToStatus(
1293         std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
1294         client_->DeleteObject(req));
1295   }
1296 
CopyObject(const S3Path & src_path,const S3Path & dest_path)1297   Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
1298     S3Model::CopyObjectRequest req;
1299     req.SetBucket(ToAwsString(dest_path.bucket));
1300     req.SetKey(ToAwsString(dest_path.key));
1301     // Copy source "Must be URL-encoded" according to AWS SDK docs.
1302     req.SetCopySource(src_path.ToURLEncodedAwsString());
1303     return OutcomeToStatus(
1304         std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
1305                               src_path.bucket, "' to key '", dest_path.key,
1306                               "' in bucket '", dest_path.bucket, "': "),
1307         client_->CopyObject(req));
1308   }
1309 
1310   // On Minio, an empty "directory" doesn't satisfy the same API requests as
1311   // a non-empty "directory".  This is a Minio-specific quirk, but we need
1312   // to handle it for unit testing.
1313 
IsEmptyDirectory(const std::string & bucket,const std::string & key,bool * out)1314   Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) {
1315     S3Model::HeadObjectRequest req;
1316     req.SetBucket(ToAwsString(bucket));
1317     if (backend_ && *backend_ == S3Backend::Minio) {
1318       // Minio wants a slash at the end, Amazon doesn't
1319       req.SetKey(ToAwsString(key) + kSep);
1320     } else {
1321       req.SetKey(ToAwsString(key));
1322     }
1323 
1324     auto outcome = client_->HeadObject(req);
1325     if (outcome.IsSuccess()) {
1326       *out = true;
1327       return Status::OK();
1328     }
1329     if (!backend_) {
1330       SaveBackend(outcome.GetError());
1331       DCHECK(backend_);
1332       if (*backend_ == S3Backend::Minio) {
1333         // Try again with separator-terminated key (see above)
1334         return IsEmptyDirectory(bucket, key, out);
1335       }
1336     }
1337     if (IsNotFound(outcome.GetError())) {
1338       *out = false;
1339       return Status::OK();
1340     }
1341     return ErrorToStatus(std::forward_as_tuple("When reading information for key '", key,
1342                                                "' in bucket '", bucket, "': "),
1343                          outcome.GetError());
1344   }
1345 
IsEmptyDirectory(const S3Path & path,bool * out)1346   Status IsEmptyDirectory(const S3Path& path, bool* out) {
1347     return IsEmptyDirectory(path.bucket, path.key, out);
1348   }
1349 
IsNonEmptyDirectory(const S3Path & path,bool * out)1350   Status IsNonEmptyDirectory(const S3Path& path, bool* out) {
1351     S3Model::ListObjectsV2Request req;
1352     req.SetBucket(ToAwsString(path.bucket));
1353     req.SetPrefix(ToAwsString(path.key) + kSep);
1354     req.SetDelimiter(Aws::String() + kSep);
1355     req.SetMaxKeys(1);
1356     auto outcome = client_->ListObjectsV2(req);
1357     if (outcome.IsSuccess()) {
1358       *out = outcome.GetResult().GetKeyCount() > 0;
1359       return Status::OK();
1360     }
1361     if (IsNotFound(outcome.GetError())) {
1362       *out = false;
1363       return Status::OK();
1364     }
1365     return ErrorToStatus(
1366         std::forward_as_tuple("When listing objects under key '", path.key,
1367                               "' in bucket '", path.bucket, "': "),
1368         outcome.GetError());
1369   }
1370 
CheckNestingDepth(int32_t nesting_depth)1371   Status CheckNestingDepth(int32_t nesting_depth) {
1372     if (nesting_depth >= kMaxNestingDepth) {
1373       return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1374                              kMaxNestingDepth, ")");
1375     }
1376     return Status::OK();
1377   }
1378 
1379   // Workhorse for GetTargetStats(FileSelector...)
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,std::vector<FileInfo> * out)1380   Status Walk(const FileSelector& select, const std::string& bucket,
1381               const std::string& key, std::vector<FileInfo>* out) {
1382     bool is_empty = true;
1383 
1384     auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1385       if (select.allow_not_found && IsNotFound(error)) {
1386         return Status::OK();
1387       }
1388       return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1389                                                  "' in bucket '", bucket, "': "),
1390                            error);
1391     };
1392 
1393     auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1394       RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1395       return select.recursive && nesting_depth <= select.max_recursion;
1396     };
1397 
1398     auto handle_results = [&](const std::string& prefix,
1399                               const S3Model::ListObjectsV2Result& result) -> Status {
1400       // Walk "directories"
1401       for (const auto& prefix : result.GetCommonPrefixes()) {
1402         is_empty = false;
1403         const auto child_key =
1404             internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1405         std::stringstream child_path;
1406         child_path << bucket << kSep << child_key;
1407         FileInfo info;
1408         info.set_path(child_path.str());
1409         info.set_type(FileType::Directory);
1410         out->push_back(std::move(info));
1411       }
1412       // Walk "files"
1413       for (const auto& obj : result.GetContents()) {
1414         is_empty = false;
1415         FileInfo info;
1416         const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
1417         if (child_key == util::string_view(prefix)) {
1418           // Amazon can return the "directory" key itself as part of the results, skip
1419           continue;
1420         }
1421         std::stringstream child_path;
1422         child_path << bucket << kSep << child_key;
1423         info.set_path(child_path.str());
1424         FileObjectToInfo(obj, &info);
1425         out->push_back(std::move(info));
1426       }
1427       return Status::OK();
1428     };
1429 
1430     RETURN_NOT_OK(TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
1431                                    handle_results, handle_error, handle_recursion));
1432 
1433     // If no contents were found, perhaps it's an empty "directory",
1434     // or perhaps it's a nonexistent entry.  Check.
1435     if (is_empty && !select.allow_not_found) {
1436       RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &is_empty));
1437       if (!is_empty) {
1438         return PathNotFound(bucket, key);
1439       }
1440     }
1441     // Sort results for convenience, since they can come massively out of order
1442     std::sort(out->begin(), out->end(), FileInfo::ByPath{});
1443     return Status::OK();
1444   }
1445 
WalkForDeleteDir(const std::string & bucket,const std::string & key,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1446   Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1447                           std::vector<std::string>* file_keys,
1448                           std::vector<std::string>* dir_keys) {
1449     auto handle_results = [&](const std::string& prefix,
1450                               const S3Model::ListObjectsV2Result& result) -> Status {
1451       // Walk "files"
1452       file_keys->reserve(file_keys->size() + result.GetContents().size());
1453       for (const auto& obj : result.GetContents()) {
1454         file_keys->emplace_back(FromAwsString(obj.GetKey()));
1455       }
1456       // Walk "directories"
1457       dir_keys->reserve(dir_keys->size() + result.GetCommonPrefixes().size());
1458       for (const auto& prefix : result.GetCommonPrefixes()) {
1459         dir_keys->emplace_back(FromAwsString(prefix.GetPrefix()));
1460       }
1461       return Status::OK();
1462     };
1463 
1464     auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1465       return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1466                                                  "' in bucket '", bucket, "': "),
1467                            error);
1468     };
1469 
1470     auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1471       RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1472       return true;  // Recurse
1473     };
1474 
1475     return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
1476                             handle_results, handle_error, handle_recursion);
1477   }
1478 
1479   // Delete multiple objects at once
DeleteObjects(const std::string & bucket,const std::vector<std::string> & keys)1480   Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
1481     struct DeleteHandler {
1482       Future<> future = Future<>::Make();
1483 
1484       // Callback for DeleteObjectsAsync
1485       void operator()(const Aws::S3::S3Client*, const S3Model::DeleteObjectsRequest& req,
1486                       const S3Model::DeleteObjectsOutcome& outcome,
1487                       const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
1488         if (!outcome.IsSuccess()) {
1489           future.MarkFinished(ErrorToStatus(outcome.GetError()));
1490           return;
1491         }
1492         // Also need to check per-key errors, even on successful outcome
1493         // See
1494         // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
1495         const auto& errors = outcome.GetResult().GetErrors();
1496         if (!errors.empty()) {
1497           std::stringstream ss;
1498           ss << "Got the following " << errors.size()
1499              << " errors when deleting objects in S3 bucket '" << req.GetBucket()
1500              << "':\n";
1501           for (const auto& error : errors) {
1502             ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
1503           }
1504           future.MarkFinished(Status::IOError(ss.str()));
1505         } else {
1506           future.MarkFinished();
1507         }
1508       }
1509     };
1510 
1511     const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
1512     std::vector<DeleteHandler> delete_handlers;
1513     std::vector<Future<>*> futures;
1514     delete_handlers.reserve(keys.size() / chunk_size + 1);
1515     futures.reserve(delete_handlers.capacity());
1516 
1517     for (size_t start = 0; start < keys.size(); start += chunk_size) {
1518       S3Model::DeleteObjectsRequest req;
1519       S3Model::Delete del;
1520       for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) {
1521         del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i])));
1522       }
1523       req.SetBucket(ToAwsString(bucket));
1524       req.SetDelete(std::move(del));
1525       delete_handlers.emplace_back();
1526       futures.push_back(&delete_handlers.back().future);
1527       client_->DeleteObjectsAsync(req, delete_handlers.back());
1528     }
1529 
1530     WaitForAll(futures);
1531     for (const auto* fut : futures) {
1532       RETURN_NOT_OK(fut->status());
1533     }
1534     return Status::OK();
1535   }
1536 
DeleteDirContents(const std::string & bucket,const std::string & key)1537   Status DeleteDirContents(const std::string& bucket, const std::string& key) {
1538     std::vector<std::string> file_keys;
1539     std::vector<std::string> dir_keys;
1540     RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys));
1541     if (file_keys.empty() && dir_keys.empty() && !key.empty()) {
1542       // No contents found, is it an empty directory?
1543       bool exists = false;
1544       RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists));
1545       if (!exists) {
1546         return PathNotFound(bucket, key);
1547       }
1548     }
1549     // First delete all "files", then delete all child "directories"
1550     RETURN_NOT_OK(DeleteObjects(bucket, file_keys));
1551     // Delete directories in reverse lexicographic order, to ensure children
1552     // are deleted before their parents (Minio).
1553     std::sort(dir_keys.rbegin(), dir_keys.rend());
1554     return DeleteObjects(bucket, dir_keys);
1555   }
1556 
EnsureDirectoryExists(const S3Path & path)1557   Status EnsureDirectoryExists(const S3Path& path) {
1558     if (!path.key.empty()) {
1559       return CreateEmptyDir(path.bucket, path.key);
1560     }
1561     return Status::OK();
1562   }
1563 
EnsureParentExists(const S3Path & path)1564   Status EnsureParentExists(const S3Path& path) {
1565     if (path.has_parent()) {
1566       return EnsureDirectoryExists(path.parent());
1567     }
1568     return Status::OK();
1569   }
1570 
ListBuckets(std::vector<std::string> * out)1571   Status ListBuckets(std::vector<std::string>* out) {
1572     out->clear();
1573     auto outcome = client_->ListBuckets();
1574     if (!outcome.IsSuccess()) {
1575       return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
1576                            outcome.GetError());
1577     }
1578     for (const auto& bucket : outcome.GetResult().GetBuckets()) {
1579       out->emplace_back(FromAwsString(bucket.GetName()));
1580     }
1581     return Status::OK();
1582   }
1583 
OpenInputFile(const std::string & s,S3FileSystem * fs)1584   Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
1585                                                          S3FileSystem* fs) {
1586     ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1587     RETURN_NOT_OK(ValidateFilePath(path));
1588 
1589     auto ptr =
1590         std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(), path);
1591     RETURN_NOT_OK(ptr->Init());
1592     return ptr;
1593   }
1594 
OpenInputFile(const FileInfo & info,S3FileSystem * fs)1595   Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
1596                                                          S3FileSystem* fs) {
1597     if (info.type() == FileType::NotFound) {
1598       return ::arrow::fs::internal::PathNotFound(info.path());
1599     }
1600     if (info.type() != FileType::File && info.type() != FileType::Unknown) {
1601       return ::arrow::fs::internal::NotAFile(info.path());
1602     }
1603 
1604     ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
1605     RETURN_NOT_OK(ValidateFilePath(path));
1606 
1607     auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(),
1608                                                  path, info.size());
1609     RETURN_NOT_OK(ptr->Init());
1610     return ptr;
1611   }
1612 };
1613 
S3FileSystem(const S3Options & options)1614 S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {}
1615 
~S3FileSystem()1616 S3FileSystem::~S3FileSystem() {}
1617 
Make(const S3Options & options)1618 Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(const S3Options& options) {
1619   RETURN_NOT_OK(CheckS3Initialized());
1620 
1621   std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options));
1622   RETURN_NOT_OK(ptr->impl_->Init());
1623   return ptr;
1624 }
1625 
Equals(const FileSystem & other) const1626 bool S3FileSystem::Equals(const FileSystem& other) const {
1627   if (this == &other) {
1628     return true;
1629   }
1630   if (other.type_name() != type_name()) {
1631     return false;
1632   }
1633   const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
1634   return options().Equals(s3fs.options());
1635 }
1636 
options() const1637 S3Options S3FileSystem::options() const { return impl_->options(); }
1638 
region() const1639 std::string S3FileSystem::region() const { return impl_->region(); }
1640 
GetFileInfo(const std::string & s)1641 Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
1642   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1643   FileInfo info;
1644   info.set_path(s);
1645 
1646   if (path.empty()) {
1647     // It's the root path ""
1648     info.set_type(FileType::Directory);
1649     return info;
1650   } else if (path.key.empty()) {
1651     // It's a bucket
1652     S3Model::HeadBucketRequest req;
1653     req.SetBucket(ToAwsString(path.bucket));
1654 
1655     auto outcome = impl_->client_->HeadBucket(req);
1656     if (!outcome.IsSuccess()) {
1657       if (!IsNotFound(outcome.GetError())) {
1658         return ErrorToStatus(
1659             std::forward_as_tuple("When getting information for bucket '", path.bucket,
1660                                   "': "),
1661             outcome.GetError());
1662       }
1663       info.set_type(FileType::NotFound);
1664       return info;
1665     }
1666     // NOTE: S3 doesn't have a bucket modification time.  Only a creation
1667     // time is available, and you have to list all buckets to get it.
1668     info.set_type(FileType::Directory);
1669     return info;
1670   } else {
1671     // It's an object
1672     S3Model::HeadObjectRequest req;
1673     req.SetBucket(ToAwsString(path.bucket));
1674     req.SetKey(ToAwsString(path.key));
1675 
1676     auto outcome = impl_->client_->HeadObject(req);
1677     if (outcome.IsSuccess()) {
1678       // "File" object found
1679       FileObjectToInfo(outcome.GetResult(), &info);
1680       return info;
1681     }
1682     if (!IsNotFound(outcome.GetError())) {
1683       return ErrorToStatus(
1684           std::forward_as_tuple("When getting information for key '", path.key,
1685                                 "' in bucket '", path.bucket, "': "),
1686           outcome.GetError());
1687     }
1688     // Not found => perhaps it's an empty "directory"
1689     bool is_dir = false;
1690     RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir));
1691     if (is_dir) {
1692       info.set_type(FileType::Directory);
1693       return info;
1694     }
1695     // Not found => perhaps it's a non-empty "directory"
1696     RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir));
1697     if (is_dir) {
1698       info.set_type(FileType::Directory);
1699     } else {
1700       info.set_type(FileType::NotFound);
1701     }
1702     return info;
1703   }
1704 }
1705 
GetFileInfo(const FileSelector & select)1706 Result<std::vector<FileInfo>> S3FileSystem::GetFileInfo(const FileSelector& select) {
1707   ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
1708 
1709   std::vector<FileInfo> results;
1710 
1711   if (base_path.empty()) {
1712     // List all buckets
1713     std::vector<std::string> buckets;
1714     RETURN_NOT_OK(impl_->ListBuckets(&buckets));
1715     for (const auto& bucket : buckets) {
1716       FileInfo info;
1717       info.set_path(bucket);
1718       info.set_type(FileType::Directory);
1719       results.push_back(std::move(info));
1720       if (select.recursive) {
1721         RETURN_NOT_OK(impl_->Walk(select, bucket, "", &results));
1722       }
1723     }
1724     return results;
1725   }
1726 
1727   // Nominal case -> walk a single bucket
1728   RETURN_NOT_OK(impl_->Walk(select, base_path.bucket, base_path.key, &results));
1729   return results;
1730 }
1731 
CreateDir(const std::string & s,bool recursive)1732 Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
1733   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1734 
1735   if (path.key.empty()) {
1736     // Create bucket
1737     return impl_->CreateBucket(path.bucket);
1738   }
1739 
1740   // Create object
1741   if (recursive) {
1742     // Ensure bucket exists
1743     RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
1744     // Ensure that all parents exist, then the directory itself
1745     std::string parent_key;
1746     for (const auto& part : path.key_parts) {
1747       parent_key += part;
1748       parent_key += kSep;
1749       RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key));
1750     }
1751     return Status::OK();
1752   } else {
1753     // Check parent dir exists
1754     if (path.has_parent()) {
1755       S3Path parent_path = path.parent();
1756       bool exists;
1757       RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists));
1758       if (!exists) {
1759         RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists));
1760       }
1761       if (!exists) {
1762         return Status::IOError("Cannot create directory '", path.full_path,
1763                                "': parent directory does not exist");
1764       }
1765     }
1766 
1767     // XXX Should we check that no non-directory entry exists?
1768     // Minio does it for us, not sure about other S3 implementations.
1769     return impl_->CreateEmptyDir(path.bucket, path.key);
1770   }
1771 }
1772 
DeleteDir(const std::string & s)1773 Status S3FileSystem::DeleteDir(const std::string& s) {
1774   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1775 
1776   if (path.empty()) {
1777     return Status::NotImplemented("Cannot delete all S3 buckets");
1778   }
1779   RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1780   if (path.key.empty()) {
1781     // Delete bucket
1782     S3Model::DeleteBucketRequest req;
1783     req.SetBucket(ToAwsString(path.bucket));
1784     return OutcomeToStatus(
1785         std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
1786         impl_->client_->DeleteBucket(req));
1787   } else {
1788     // Delete "directory"
1789     RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
1790     // Parent may be implicitly deleted if it became empty, recreate it
1791     return impl_->EnsureParentExists(path);
1792   }
1793 }
1794 
DeleteDirContents(const std::string & s)1795 Status S3FileSystem::DeleteDirContents(const std::string& s) {
1796   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1797 
1798   if (path.empty()) {
1799     return Status::NotImplemented("Cannot delete all S3 buckets");
1800   }
1801   RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1802   // Directory may be implicitly deleted, recreate it
1803   return impl_->EnsureDirectoryExists(path);
1804 }
1805 
DeleteRootDirContents()1806 Status S3FileSystem::DeleteRootDirContents() {
1807   return Status::NotImplemented("Cannot delete all S3 buckets");
1808 }
1809 
DeleteFile(const std::string & s)1810 Status S3FileSystem::DeleteFile(const std::string& s) {
1811   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1812   RETURN_NOT_OK(ValidateFilePath(path));
1813 
1814   // Check the object exists
1815   S3Model::HeadObjectRequest req;
1816   req.SetBucket(ToAwsString(path.bucket));
1817   req.SetKey(ToAwsString(path.key));
1818 
1819   auto outcome = impl_->client_->HeadObject(req);
1820   if (!outcome.IsSuccess()) {
1821     if (IsNotFound(outcome.GetError())) {
1822       return PathNotFound(path);
1823     } else {
1824       return ErrorToStatus(
1825           std::forward_as_tuple("When getting information for key '", path.key,
1826                                 "' in bucket '", path.bucket, "': "),
1827           outcome.GetError());
1828     }
1829   }
1830   // Object found, delete it
1831   RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key));
1832   // Parent may be implicitly deleted if it became empty, recreate it
1833   return impl_->EnsureParentExists(path);
1834 }
1835 
Move(const std::string & src,const std::string & dest)1836 Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
1837   // XXX We don't implement moving directories as it would be too expensive:
1838   // one must copy all directory contents one by one (including object data),
1839   // then delete the original contents.
1840 
1841   ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
1842   RETURN_NOT_OK(ValidateFilePath(src_path));
1843   ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
1844   RETURN_NOT_OK(ValidateFilePath(dest_path));
1845 
1846   if (src_path == dest_path) {
1847     return Status::OK();
1848   }
1849   RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path));
1850   RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key));
1851   // Source parent may be implicitly deleted if it became empty, recreate it
1852   return impl_->EnsureParentExists(src_path);
1853 }
1854 
CopyFile(const std::string & src,const std::string & dest)1855 Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
1856   ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
1857   RETURN_NOT_OK(ValidateFilePath(src_path));
1858   ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
1859   RETURN_NOT_OK(ValidateFilePath(dest_path));
1860 
1861   if (src_path == dest_path) {
1862     return Status::OK();
1863   }
1864   return impl_->CopyObject(src_path, dest_path);
1865 }
1866 
OpenInputStream(const std::string & s)1867 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
1868     const std::string& s) {
1869   return impl_->OpenInputFile(s, this);
1870 }
1871 
OpenInputStream(const FileInfo & info)1872 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
1873     const FileInfo& info) {
1874   return impl_->OpenInputFile(info, this);
1875 }
1876 
OpenInputFile(const std::string & s)1877 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
1878     const std::string& s) {
1879   return impl_->OpenInputFile(s, this);
1880 }
1881 
OpenInputFile(const FileInfo & info)1882 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
1883     const FileInfo& info) {
1884   return impl_->OpenInputFile(info, this);
1885 }
1886 
OpenOutputStream(const std::string & s)1887 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
1888     const std::string& s) {
1889   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1890   RETURN_NOT_OK(ValidateFilePath(path));
1891 
1892   auto ptr = std::make_shared<ObjectOutputStream>(
1893       shared_from_this(), impl_->client_.get(), path, impl_->options());
1894   RETURN_NOT_OK(ptr->Init());
1895   return ptr;
1896 }
1897 
OpenAppendStream(const std::string & path)1898 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
1899     const std::string& path) {
1900   // XXX Investigate UploadPartCopy? Does it work with source == destination?
1901   // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
1902   // (but would need to fall back to GET if the current data is < 5 MB)
1903   return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
1904 }
1905 
1906 //
1907 // Top-level utility functions
1908 //
1909 
ResolveBucketRegion(const std::string & bucket)1910 Result<std::string> ResolveBucketRegion(const std::string& bucket) {
1911   ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
1912   return resolver->ResolveRegion(bucket);
1913 }
1914 
1915 }  // namespace fs
1916 }  // namespace arrow
1917