1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "arrow/filesystem/s3fs.h"
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <chrono>
23 #include <condition_variable>
24 #include <functional>
25 #include <memory>
26 #include <mutex>
27 #include <sstream>
28 #include <thread>
29 #include <unordered_map>
30 #include <utility>
31 
32 #ifdef _WIN32
33 // Undefine preprocessor macros that interfere with AWS function / method names
34 #ifdef GetMessage
35 #undef GetMessage
36 #endif
37 #ifdef GetObject
38 #undef GetObject
39 #endif
40 #endif
41 
42 #include <aws/core/Aws.h>
43 #include <aws/core/Region.h>
44 #include <aws/core/auth/AWSCredentials.h>
45 #include <aws/core/auth/AWSCredentialsProviderChain.h>
46 #include <aws/core/auth/STSCredentialsProvider.h>
47 #include <aws/core/client/DefaultRetryStrategy.h>
48 #include <aws/core/client/RetryStrategy.h>
49 #include <aws/core/http/HttpResponse.h>
50 #include <aws/core/utils/logging/ConsoleLogSystem.h>
51 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
52 #include <aws/core/utils/xml/XmlSerializer.h>
53 #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
54 #include <aws/s3/S3Client.h>
55 #include <aws/s3/model/AbortMultipartUploadRequest.h>
56 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
57 #include <aws/s3/model/CompletedMultipartUpload.h>
58 #include <aws/s3/model/CompletedPart.h>
59 #include <aws/s3/model/CopyObjectRequest.h>
60 #include <aws/s3/model/CreateBucketRequest.h>
61 #include <aws/s3/model/CreateMultipartUploadRequest.h>
62 #include <aws/s3/model/DeleteBucketRequest.h>
63 #include <aws/s3/model/DeleteObjectRequest.h>
64 #include <aws/s3/model/DeleteObjectsRequest.h>
65 #include <aws/s3/model/GetObjectRequest.h>
66 #include <aws/s3/model/HeadBucketRequest.h>
67 #include <aws/s3/model/HeadObjectRequest.h>
68 #include <aws/s3/model/ListBucketsResult.h>
69 #include <aws/s3/model/ListObjectsV2Request.h>
70 #include <aws/s3/model/ObjectCannedACL.h>
71 #include <aws/s3/model/PutObjectRequest.h>
72 #include <aws/s3/model/UploadPartRequest.h>
73 
74 #include "arrow/util/windows_fixup.h"
75 
76 #include "arrow/buffer.h"
77 #include "arrow/filesystem/filesystem.h"
78 #include "arrow/filesystem/path_util.h"
79 #include "arrow/filesystem/s3_internal.h"
80 #include "arrow/filesystem/util_internal.h"
81 #include "arrow/io/interfaces.h"
82 #include "arrow/io/memory.h"
83 #include "arrow/io/util_internal.h"
84 #include "arrow/result.h"
85 #include "arrow/status.h"
86 #include "arrow/util/async_generator.h"
87 #include "arrow/util/atomic_shared_ptr.h"
88 #include "arrow/util/checked_cast.h"
89 #include "arrow/util/future.h"
90 #include "arrow/util/key_value_metadata.h"
91 #include "arrow/util/logging.h"
92 #include "arrow/util/optional.h"
93 #include "arrow/util/task_group.h"
94 #include "arrow/util/thread_pool.h"
95 
96 namespace arrow {
97 
98 using internal::TaskGroup;
99 using internal::Uri;
100 using io::internal::SubmitIO;
101 
102 namespace fs {
103 
104 using ::Aws::Client::AWSError;
105 using ::Aws::S3::S3Errors;
106 namespace S3Model = Aws::S3::Model;
107 
108 using internal::ConnectRetryStrategy;
109 using internal::DetectS3Backend;
110 using internal::ErrorToStatus;
111 using internal::FromAwsDatetime;
112 using internal::FromAwsString;
113 using internal::IsAlreadyExists;
114 using internal::IsNotFound;
115 using internal::OutcomeToResult;
116 using internal::OutcomeToStatus;
117 using internal::S3Backend;
118 using internal::ToAwsString;
119 using internal::ToURLEncodedAwsString;
120 
121 static const char kSep = '/';
122 
123 namespace {
124 
125 std::mutex aws_init_lock;
126 Aws::SDKOptions aws_options;
127 std::atomic<bool> aws_initialized(false);
128 
DoInitializeS3(const S3GlobalOptions & options)129 Status DoInitializeS3(const S3GlobalOptions& options) {
130   Aws::Utils::Logging::LogLevel aws_log_level;
131 
132 #define LOG_LEVEL_CASE(level_name)                             \
133   case S3LogLevel::level_name:                                 \
134     aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
135     break;
136 
137   switch (options.log_level) {
138     LOG_LEVEL_CASE(Fatal)
139     LOG_LEVEL_CASE(Error)
140     LOG_LEVEL_CASE(Warn)
141     LOG_LEVEL_CASE(Info)
142     LOG_LEVEL_CASE(Debug)
143     LOG_LEVEL_CASE(Trace)
144     default:
145       aws_log_level = Aws::Utils::Logging::LogLevel::Off;
146   }
147 
148 #undef LOG_LEVEL_CASE
149 
150   aws_options.loggingOptions.logLevel = aws_log_level;
151   // By default the AWS SDK logs to files, log to console instead
152   aws_options.loggingOptions.logger_create_fn = [] {
153     return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
154         aws_options.loggingOptions.logLevel);
155   };
156   Aws::InitAPI(aws_options);
157   aws_initialized.store(true);
158   return Status::OK();
159 }
160 
161 }  // namespace
162 
InitializeS3(const S3GlobalOptions & options)163 Status InitializeS3(const S3GlobalOptions& options) {
164   std::lock_guard<std::mutex> lock(aws_init_lock);
165   return DoInitializeS3(options);
166 }
167 
FinalizeS3()168 Status FinalizeS3() {
169   std::lock_guard<std::mutex> lock(aws_init_lock);
170   Aws::ShutdownAPI(aws_options);
171   aws_initialized.store(false);
172   return Status::OK();
173 }
174 
EnsureS3Initialized()175 Status EnsureS3Initialized() {
176   std::lock_guard<std::mutex> lock(aws_init_lock);
177   if (!aws_initialized.load()) {
178     S3GlobalOptions options{S3LogLevel::Fatal};
179     return DoInitializeS3(options);
180   }
181   return Status::OK();
182 }
183 
184 // -----------------------------------------------------------------------
185 // S3ProxyOptions implementation
186 
FromUri(const Uri & uri)187 Result<S3ProxyOptions> S3ProxyOptions::FromUri(const Uri& uri) {
188   S3ProxyOptions options;
189 
190   options.scheme = uri.scheme();
191   options.host = uri.host();
192   options.port = uri.port();
193   options.username = uri.username();
194   options.password = uri.password();
195 
196   return options;
197 }
198 
FromUri(const std::string & uri_string)199 Result<S3ProxyOptions> S3ProxyOptions::FromUri(const std::string& uri_string) {
200   Uri uri;
201   RETURN_NOT_OK(uri.Parse(uri_string));
202   return FromUri(uri);
203 }
204 
Equals(const S3ProxyOptions & other) const205 bool S3ProxyOptions::Equals(const S3ProxyOptions& other) const {
206   return (scheme == other.scheme && host == other.host && port == other.port &&
207           username == other.username && password == other.password);
208 }
209 
210 // -----------------------------------------------------------------------
211 // S3Options implementation
212 
ConfigureDefaultCredentials()213 void S3Options::ConfigureDefaultCredentials() {
214   credentials_provider =
215       std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
216   credentials_kind = S3CredentialsKind::Default;
217 }
218 
ConfigureAnonymousCredentials()219 void S3Options::ConfigureAnonymousCredentials() {
220   credentials_provider = std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
221   credentials_kind = S3CredentialsKind::Anonymous;
222 }
223 
ConfigureAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)224 void S3Options::ConfigureAccessKey(const std::string& access_key,
225                                    const std::string& secret_key,
226                                    const std::string& session_token) {
227   credentials_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
228       ToAwsString(access_key), ToAwsString(secret_key), ToAwsString(session_token));
229   credentials_kind = S3CredentialsKind::Explicit;
230 }
231 
ConfigureAssumeRoleCredentials(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)232 void S3Options::ConfigureAssumeRoleCredentials(
233     const std::string& role_arn, const std::string& session_name,
234     const std::string& external_id, int load_frequency,
235     const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
236   credentials_provider = std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
237       ToAwsString(role_arn), ToAwsString(session_name), ToAwsString(external_id),
238       load_frequency, stsClient);
239   credentials_kind = S3CredentialsKind::Role;
240 }
241 
ConfigureAssumeRoleWithWebIdentityCredentials()242 void S3Options::ConfigureAssumeRoleWithWebIdentityCredentials() {
243   // The AWS SDK uses environment variables AWS_DEFAULT_REGION,
244   // AWS_ROLE_ARN, AWS_WEB_IDENTITY_TOKEN_FILE and AWS_ROLE_SESSION_NAME
245   // to configure the required credentials
246   credentials_provider =
247       std::make_shared<Aws::Auth::STSAssumeRoleWebIdentityCredentialsProvider>();
248   credentials_kind = S3CredentialsKind::WebIdentity;
249 }
250 
GetAccessKey() const251 std::string S3Options::GetAccessKey() const {
252   auto credentials = credentials_provider->GetAWSCredentials();
253   return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
254 }
255 
GetSecretKey() const256 std::string S3Options::GetSecretKey() const {
257   auto credentials = credentials_provider->GetAWSCredentials();
258   return std::string(FromAwsString(credentials.GetAWSSecretKey()));
259 }
260 
GetSessionToken() const261 std::string S3Options::GetSessionToken() const {
262   auto credentials = credentials_provider->GetAWSCredentials();
263   return std::string(FromAwsString(credentials.GetSessionToken()));
264 }
265 
Defaults()266 S3Options S3Options::Defaults() {
267   S3Options options;
268   options.ConfigureDefaultCredentials();
269   return options;
270 }
271 
Anonymous()272 S3Options S3Options::Anonymous() {
273   S3Options options;
274   options.ConfigureAnonymousCredentials();
275   return options;
276 }
277 
FromAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)278 S3Options S3Options::FromAccessKey(const std::string& access_key,
279                                    const std::string& secret_key,
280                                    const std::string& session_token) {
281   S3Options options;
282   options.ConfigureAccessKey(access_key, secret_key, session_token);
283   return options;
284 }
285 
FromAssumeRole(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)286 S3Options S3Options::FromAssumeRole(
287     const std::string& role_arn, const std::string& session_name,
288     const std::string& external_id, int load_frequency,
289     const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
290   S3Options options;
291   options.role_arn = role_arn;
292   options.session_name = session_name;
293   options.external_id = external_id;
294   options.load_frequency = load_frequency;
295   options.ConfigureAssumeRoleCredentials(role_arn, session_name, external_id,
296                                          load_frequency, stsClient);
297   return options;
298 }
299 
FromAssumeRoleWithWebIdentity()300 S3Options S3Options::FromAssumeRoleWithWebIdentity() {
301   S3Options options;
302   options.ConfigureAssumeRoleWithWebIdentityCredentials();
303   return options;
304 }
305 
FromUri(const Uri & uri,std::string * out_path)306 Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
307   S3Options options;
308 
309   const auto bucket = uri.host();
310   auto path = uri.path();
311   if (bucket.empty()) {
312     if (!path.empty()) {
313       return Status::Invalid("Missing bucket name in S3 URI");
314     }
315   } else {
316     if (path.empty()) {
317       path = bucket;
318     } else {
319       if (path[0] != '/') {
320         return Status::Invalid("S3 URI should absolute, not relative");
321       }
322       path = bucket + path;
323     }
324   }
325   if (out_path != nullptr) {
326     *out_path = std::string(internal::RemoveTrailingSlash(path));
327   }
328 
329   std::unordered_map<std::string, std::string> options_map;
330   ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
331   for (const auto& kv : options_items) {
332     options_map.emplace(kv.first, kv.second);
333   }
334 
335   const auto username = uri.username();
336   if (!username.empty()) {
337     options.ConfigureAccessKey(username, uri.password());
338   } else {
339     options.ConfigureDefaultCredentials();
340   }
341 
342   bool region_set = false;
343   for (const auto& kv : options_map) {
344     if (kv.first == "region") {
345       options.region = kv.second;
346       region_set = true;
347     } else if (kv.first == "scheme") {
348       options.scheme = kv.second;
349     } else if (kv.first == "endpoint_override") {
350       options.endpoint_override = kv.second;
351     } else {
352       return Status::Invalid("Unexpected query parameter in S3 URI: '", kv.first, "'");
353     }
354   }
355 
356   if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
357     // XXX Should we use a dedicated resolver with the given credentials?
358     ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
359   }
360 
361   return options;
362 }
363 
FromUri(const std::string & uri_string,std::string * out_path)364 Result<S3Options> S3Options::FromUri(const std::string& uri_string,
365                                      std::string* out_path) {
366   Uri uri;
367   RETURN_NOT_OK(uri.Parse(uri_string));
368   return FromUri(uri, out_path);
369 }
370 
Equals(const S3Options & other) const371 bool S3Options::Equals(const S3Options& other) const {
372   return (region == other.region && endpoint_override == other.endpoint_override &&
373           scheme == other.scheme && background_writes == other.background_writes &&
374           credentials_kind == other.credentials_kind &&
375           proxy_options.Equals(other.proxy_options) &&
376           GetAccessKey() == other.GetAccessKey() &&
377           GetSecretKey() == other.GetSecretKey() &&
378           GetSessionToken() == other.GetSessionToken());
379 }
380 
381 namespace {
382 
CheckS3Initialized()383 Status CheckS3Initialized() {
384   if (!aws_initialized.load()) {
385     return Status::Invalid(
386         "S3 subsystem not initialized; please call InitializeS3() "
387         "before carrying out any S3-related operation");
388   }
389   return Status::OK();
390 }
391 
392 // XXX Sanitize paths by removing leading slash?
393 
394 struct S3Path {
395   std::string full_path;
396   std::string bucket;
397   std::string key;
398   std::vector<std::string> key_parts;
399 
FromStringarrow::fs::__anon5c130cae0311::S3Path400   static Result<S3Path> FromString(const std::string& s) {
401     const auto src = internal::RemoveTrailingSlash(s);
402     auto first_sep = src.find_first_of(kSep);
403     if (first_sep == 0) {
404       return Status::Invalid("Path cannot start with a separator ('", s, "')");
405     }
406     if (first_sep == std::string::npos) {
407       return S3Path{std::string(src), std::string(src), "", {}};
408     }
409     S3Path path;
410     path.full_path = std::string(src);
411     path.bucket = std::string(src.substr(0, first_sep));
412     path.key = std::string(src.substr(first_sep + 1));
413     path.key_parts = internal::SplitAbstractPath(path.key);
414     RETURN_NOT_OK(Validate(&path));
415     return path;
416   }
417 
Validatearrow::fs::__anon5c130cae0311::S3Path418   static Status Validate(const S3Path* path) {
419     auto result = internal::ValidateAbstractPathParts(path->key_parts);
420     if (!result.ok()) {
421       return Status::Invalid(result.message(), " in path ", path->full_path);
422     } else {
423       return result;
424     }
425   }
426 
ToAwsStringarrow::fs::__anon5c130cae0311::S3Path427   Aws::String ToAwsString() const {
428     Aws::String res(bucket.begin(), bucket.end());
429     res.reserve(bucket.size() + key.size() + 1);
430     res += kSep;
431     res.append(key.begin(), key.end());
432     return res;
433   }
434 
ToURLEncodedAwsStringarrow::fs::__anon5c130cae0311::S3Path435   Aws::String ToURLEncodedAwsString() const {
436     // URL-encode individual parts, not the '/' separator
437     Aws::String res;
438     res += internal::ToURLEncodedAwsString(bucket);
439     for (const auto& part : key_parts) {
440       res += kSep;
441       res += internal::ToURLEncodedAwsString(part);
442     }
443     return res;
444   }
445 
parentarrow::fs::__anon5c130cae0311::S3Path446   S3Path parent() const {
447     DCHECK(!key_parts.empty());
448     auto parent = S3Path{"", bucket, "", key_parts};
449     parent.key_parts.pop_back();
450     parent.key = internal::JoinAbstractPath(parent.key_parts);
451     parent.full_path = parent.bucket + kSep + parent.key;
452     return parent;
453   }
454 
has_parentarrow::fs::__anon5c130cae0311::S3Path455   bool has_parent() const { return !key.empty(); }
456 
emptyarrow::fs::__anon5c130cae0311::S3Path457   bool empty() const { return bucket.empty() && key.empty(); }
458 
operator ==arrow::fs::__anon5c130cae0311::S3Path459   bool operator==(const S3Path& other) const {
460     return bucket == other.bucket && key == other.key;
461   }
462 };
463 
464 // XXX return in OutcomeToStatus instead?
PathNotFound(const S3Path & path)465 Status PathNotFound(const S3Path& path) {
466   return ::arrow::fs::internal::PathNotFound(path.full_path);
467 }
468 
PathNotFound(const std::string & bucket,const std::string & key)469 Status PathNotFound(const std::string& bucket, const std::string& key) {
470   return ::arrow::fs::internal::PathNotFound(bucket + kSep + key);
471 }
472 
NotAFile(const S3Path & path)473 Status NotAFile(const S3Path& path) {
474   return ::arrow::fs::internal::NotAFile(path.full_path);
475 }
476 
ValidateFilePath(const S3Path & path)477 Status ValidateFilePath(const S3Path& path) {
478   if (path.bucket.empty() || path.key.empty()) {
479     return NotAFile(path);
480   }
481   return Status::OK();
482 }
483 
FormatRange(int64_t start,int64_t length)484 std::string FormatRange(int64_t start, int64_t length) {
485   // Format a HTTP range header value
486   std::stringstream ss;
487   ss << "bytes=" << start << "-" << start + length - 1;
488   return ss.str();
489 }
490 
491 // An AWS RetryStrategy that wraps a provided arrow::fs::S3RetryStrategy
492 class WrappedRetryStrategy : public Aws::Client::RetryStrategy {
493  public:
WrappedRetryStrategy(const std::shared_ptr<S3RetryStrategy> & s3_retry_strategy)494   explicit WrappedRetryStrategy(const std::shared_ptr<S3RetryStrategy>& s3_retry_strategy)
495       : s3_retry_strategy_(s3_retry_strategy) {}
496 
ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries) const497   bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
498                    long attempted_retries) const override {  // NOLINT runtime/int
499     S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error);
500     return s3_retry_strategy_->ShouldRetry(detail,
501                                            static_cast<int64_t>(attempted_retries));
502   }
503 
CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries) const504   long CalculateDelayBeforeNextRetry(  // NOLINT runtime/int
505       const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
506       long attempted_retries) const override {  // NOLINT runtime/int
507     S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error);
508     return static_cast<long>(  // NOLINT runtime/int
509         s3_retry_strategy_->CalculateDelayBeforeNextRetry(
510             detail, static_cast<int64_t>(attempted_retries)));
511   }
512 
513  private:
514   template <typename ErrorType>
ErrorToDetail(const Aws::Client::AWSError<ErrorType> & error)515   static S3RetryStrategy::AWSErrorDetail ErrorToDetail(
516       const Aws::Client::AWSError<ErrorType>& error) {
517     S3RetryStrategy::AWSErrorDetail detail;
518     detail.error_type = static_cast<int>(error.GetErrorType());
519     detail.message = std::string(FromAwsString(error.GetMessage()));
520     detail.exception_name = std::string(FromAwsString(error.GetExceptionName()));
521     detail.should_retry = error.ShouldRetry();
522     return detail;
523   }
524 
525   std::shared_ptr<S3RetryStrategy> s3_retry_strategy_;
526 };
527 
528 class S3Client : public Aws::S3::S3Client {
529  public:
530   using Aws::S3::S3Client::S3Client;
531 
532   // To get a bucket's region, we must extract the "x-amz-bucket-region" header
533   // from the response to a HEAD bucket request.
534   // Unfortunately, the S3Client APIs don't let us access the headers of successful
535   // responses.  So we have to cook a AWS request and issue it ourselves.
536 
GetBucketRegion(const S3Model::HeadBucketRequest & request)537   Result<std::string> GetBucketRegion(const S3Model::HeadBucketRequest& request) {
538     auto uri = GeneratePresignedUrl(request.GetBucket(),
539                                     /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD);
540     // NOTE: The signer region argument isn't passed here, as there's no easy
541     // way of computing it (the relevant method is private).
542     auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD,
543                                Aws::Auth::SIGV4_SIGNER);
544     const auto code = outcome.IsSuccess() ? outcome.GetResult().GetResponseCode()
545                                           : outcome.GetError().GetResponseCode();
546     const auto& headers = outcome.IsSuccess()
547                               ? outcome.GetResult().GetHeaderValueCollection()
548                               : outcome.GetError().GetResponseHeaders();
549 
550     const auto it = headers.find(ToAwsString("x-amz-bucket-region"));
551     if (it == headers.end()) {
552       if (code == Aws::Http::HttpResponseCode::NOT_FOUND) {
553         return Status::IOError("Bucket '", request.GetBucket(), "' not found");
554       } else if (!outcome.IsSuccess()) {
555         return ErrorToStatus(std::forward_as_tuple("When resolving region for bucket '",
556                                                    request.GetBucket(), "': "),
557                              outcome.GetError());
558       } else {
559         return Status::IOError("When resolving region for bucket '", request.GetBucket(),
560                                "': missing 'x-amz-bucket-region' header in response");
561       }
562     }
563     return std::string(FromAwsString(it->second));
564   }
565 
GetBucketRegion(const std::string & bucket)566   Result<std::string> GetBucketRegion(const std::string& bucket) {
567     S3Model::HeadBucketRequest req;
568     req.SetBucket(ToAwsString(bucket));
569     return GetBucketRegion(req);
570   }
571 
CompleteMultipartUploadWithErrorFixup(S3Model::CompleteMultipartUploadRequest && request) const572   S3Model::CompleteMultipartUploadOutcome CompleteMultipartUploadWithErrorFixup(
573       S3Model::CompleteMultipartUploadRequest&& request) const {
574     // CompletedMultipartUpload can return a 200 OK response with an error
575     // encoded in the response body, in which case we should either retry
576     // or propagate the error to the user (see
577     // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html).
578     //
579     // Unfortunately the AWS SDK doesn't detect such situations but lets them
580     // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658).
581     //
582     // We work around the issue by registering a DataReceivedEventHandler
583     // which parses the XML response for embedded errors.
584 
585     util::optional<AWSError<Aws::Client::CoreErrors>> aws_error;
586 
587     auto handler = [&](const Aws::Http::HttpRequest* http_req,
588                        Aws::Http::HttpResponse* http_resp,
589                        long long) {  // NOLINT runtime/int
590       auto& stream = http_resp->GetResponseBody();
591       const auto pos = stream.tellg();
592       const auto doc = Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream);
593       // Rewind stream for later
594       stream.clear();
595       stream.seekg(pos);
596 
597       if (doc.WasParseSuccessful()) {
598         auto root = doc.GetRootElement();
599         if (!root.IsNull()) {
600           // Detect something that looks like an abnormal CompletedMultipartUpload
601           // response.
602           if (root.GetName() != "CompleteMultipartUploadResult" ||
603               !root.FirstChild("Error").IsNull() || !root.FirstChild("Errors").IsNull()) {
604             // Make sure the error marshaller doesn't see a 200 OK
605             http_resp->SetResponseCode(
606                 Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
607             aws_error = GetErrorMarshaller()->Marshall(*http_resp);
608             // Rewind stream for later
609             stream.clear();
610             stream.seekg(pos);
611           }
612         }
613       }
614     };
615 
616     request.SetDataReceivedEventHandler(std::move(handler));
617 
618     // We don't have access to the configured AWS retry strategy
619     // (m_retryStrategy is a private member of AwsClient), so don't use that.
620     std::unique_ptr<Aws::Client::RetryStrategy> retry_strategy;
621     if (s3_retry_strategy_) {
622       retry_strategy.reset(new WrappedRetryStrategy(s3_retry_strategy_));
623     } else {
624       // Note that DefaultRetryStrategy, unlike StandardRetryStrategy,
625       // has empty definitions for RequestBookkeeping() and GetSendToken(),
626       // which simplifies the code below.
627       retry_strategy.reset(new Aws::Client::DefaultRetryStrategy());
628     }
629 
630     for (int32_t retries = 0;; retries++) {
631       aws_error.reset();
632       auto outcome = Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request);
633       if (!outcome.IsSuccess()) {
634         // Error returned in HTTP headers (or client failure)
635         return outcome;
636       }
637       if (!aws_error.has_value()) {
638         // Genuinely successful outcome
639         return outcome;
640       }
641 
642       const bool should_retry = retry_strategy->ShouldRetry(*aws_error, retries);
643 
644       ARROW_LOG(WARNING)
645           << "CompletedMultipartUpload got error embedded in a 200 OK response: "
646           << aws_error->GetExceptionName() << " (\"" << aws_error->GetMessage()
647           << "\"), retry = " << should_retry;
648 
649       if (!should_retry) {
650         break;
651       }
652       const auto delay = std::chrono::milliseconds(
653           retry_strategy->CalculateDelayBeforeNextRetry(*aws_error, retries));
654       std::this_thread::sleep_for(delay);
655     }
656 
657     DCHECK(aws_error.has_value());
658     auto s3_error = AWSError<S3Errors>(std::move(aws_error).value());
659     return S3Model::CompleteMultipartUploadOutcome(std::move(s3_error));
660   }
661 
662   std::shared_ptr<S3RetryStrategy> s3_retry_strategy_;
663 };
664 
665 // In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a bool.
666 template <bool Never = false>
DisableRedirectsImpl(bool * followRedirects)667 void DisableRedirectsImpl(bool* followRedirects) {
668   *followRedirects = false;
669 }
670 
671 // In AWS SDK >= 1.8, it's a Aws::Client::FollowRedirectsPolicy scoped enum.
672 template <typename PolicyEnum, PolicyEnum Never = PolicyEnum::NEVER>
DisableRedirectsImpl(PolicyEnum * followRedirects)673 void DisableRedirectsImpl(PolicyEnum* followRedirects) {
674   *followRedirects = Never;
675 }
676 
DisableRedirects(Aws::Client::ClientConfiguration * c)677 void DisableRedirects(Aws::Client::ClientConfiguration* c) {
678   DisableRedirectsImpl(&c->followRedirects);
679 }
680 
681 class ClientBuilder {
682  public:
ClientBuilder(S3Options options)683   explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
684 
config() const685   const Aws::Client::ClientConfiguration& config() const { return client_config_; }
686 
mutable_config()687   Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; }
688 
BuildClient()689   Result<std::shared_ptr<S3Client>> BuildClient() {
690     credentials_provider_ = options_.credentials_provider;
691     if (!options_.region.empty()) {
692       client_config_.region = ToAwsString(options_.region);
693     }
694     client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
695     if (options_.scheme == "http") {
696       client_config_.scheme = Aws::Http::Scheme::HTTP;
697     } else if (options_.scheme == "https") {
698       client_config_.scheme = Aws::Http::Scheme::HTTPS;
699     } else {
700       return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
701     }
702     if (options_.retry_strategy) {
703       client_config_.retryStrategy =
704           std::make_shared<WrappedRetryStrategy>(options_.retry_strategy);
705     } else {
706       client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
707     }
708     if (!internal::global_options.tls_ca_file_path.empty()) {
709       client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
710     }
711     if (!internal::global_options.tls_ca_dir_path.empty()) {
712       client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
713     }
714 
715     const bool use_virtual_addressing = options_.endpoint_override.empty();
716 
717     // Set proxy options if provided
718     if (!options_.proxy_options.scheme.empty()) {
719       if (options_.proxy_options.scheme == "http") {
720         client_config_.proxyScheme = Aws::Http::Scheme::HTTP;
721       } else if (options_.proxy_options.scheme == "https") {
722         client_config_.proxyScheme = Aws::Http::Scheme::HTTPS;
723       } else {
724         return Status::Invalid("Invalid proxy connection scheme '",
725                                options_.proxy_options.scheme, "'");
726       }
727     }
728     if (!options_.proxy_options.host.empty()) {
729       client_config_.proxyHost = ToAwsString(options_.proxy_options.host);
730     }
731     if (options_.proxy_options.port != -1) {
732       client_config_.proxyPort = options_.proxy_options.port;
733     }
734     if (!options_.proxy_options.username.empty()) {
735       client_config_.proxyUserName = ToAwsString(options_.proxy_options.username);
736     }
737     if (!options_.proxy_options.password.empty()) {
738       client_config_.proxyPassword = ToAwsString(options_.proxy_options.password);
739     }
740 
741     auto client = std::make_shared<S3Client>(
742         credentials_provider_, client_config_,
743         Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
744         use_virtual_addressing);
745     client->s3_retry_strategy_ = options_.retry_strategy;
746     return client;
747   }
748 
options() const749   const S3Options& options() const { return options_; }
750 
751  protected:
752   S3Options options_;
753   Aws::Client::ClientConfiguration client_config_;
754   std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
755 };
756 
757 // -----------------------------------------------------------------------
758 // S3 region resolver
759 
760 class RegionResolver {
761  public:
Make(S3Options options)762   static Result<std::shared_ptr<RegionResolver>> Make(S3Options options) {
763     std::shared_ptr<RegionResolver> resolver(new RegionResolver(std::move(options)));
764     RETURN_NOT_OK(resolver->Init());
765     return resolver;
766   }
767 
DefaultInstance()768   static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
769     static std::shared_ptr<RegionResolver> instance;
770     auto resolver = arrow::internal::atomic_load(&instance);
771     if (resolver) {
772       return resolver;
773     }
774     auto maybe_resolver = Make(S3Options::Anonymous());
775     if (!maybe_resolver.ok()) {
776       return maybe_resolver;
777     }
778     // Make sure to always return the same instance even if several threads
779     // call DefaultInstance at once.
780     std::shared_ptr<RegionResolver> existing;
781     if (arrow::internal::atomic_compare_exchange_strong(&instance, &existing,
782                                                         *maybe_resolver)) {
783       return *maybe_resolver;
784     } else {
785       return existing;
786     }
787   }
788 
ResolveRegion(const std::string & bucket)789   Result<std::string> ResolveRegion(const std::string& bucket) {
790     std::unique_lock<std::mutex> lock(cache_mutex_);
791     auto it = cache_.find(bucket);
792     if (it != cache_.end()) {
793       return it->second;
794     }
795     lock.unlock();
796     ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
797     lock.lock();
798     // Note we don't cache a non-existent bucket, as the bucket could be created later
799     cache_[bucket] = region;
800     return region;
801   }
802 
ResolveRegionUncached(const std::string & bucket)803   Result<std::string> ResolveRegionUncached(const std::string& bucket) {
804     return client_->GetBucketRegion(bucket);
805   }
806 
807  protected:
RegionResolver(S3Options options)808   explicit RegionResolver(S3Options options) : builder_(std::move(options)) {}
809 
Init()810   Status Init() {
811     DCHECK(builder_.options().endpoint_override.empty());
812     // On Windows with AWS SDK >= 1.8, it is necessary to disable redirects (ARROW-10085).
813     DisableRedirects(builder_.mutable_config());
814     return builder_.BuildClient().Value(&client_);
815   }
816 
817   ClientBuilder builder_;
818   std::shared_ptr<S3Client> client_;
819 
820   std::mutex cache_mutex_;
821   // XXX Should cache size be bounded?  It must be quite unusual to query millions
822   // of different buckets in a single program invocation...
823   std::unordered_map<std::string, std::string> cache_;
824 };
825 
826 // -----------------------------------------------------------------------
827 // S3 file stream implementations
828 
829 // A non-copying iostream.
830 // See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
831 // https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
832 class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
833  public:
StringViewStream(const void * data,int64_t nbytes)834   StringViewStream(const void* data, int64_t nbytes)
835       : Aws::Utils::Stream::PreallocatedStreamBuf(
836             reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
837             static_cast<size_t>(nbytes)),
838         std::iostream(this) {}
839 };
840 
841 // By default, the AWS SDK reads object data into an auto-growing StringStream.
842 // To avoid copies, read directly into our preallocated buffer instead.
843 // See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
844 // functionally similar recipe.
AwsWriteableStreamFactory(void * data,int64_t nbytes)845 Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
846   return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
847 }
848 
GetObjectRange(Aws::S3::S3Client * client,const S3Path & path,int64_t start,int64_t length,void * out)849 Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
850                                                 const S3Path& path, int64_t start,
851                                                 int64_t length, void* out) {
852   S3Model::GetObjectRequest req;
853   req.SetBucket(ToAwsString(path.bucket));
854   req.SetKey(ToAwsString(path.key));
855   req.SetRange(ToAwsString(FormatRange(start, length)));
856   req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
857   return OutcomeToResult(client->GetObject(req));
858 }
859 
860 template <typename ObjectResult>
GetObjectMetadata(const ObjectResult & result)861 std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& result) {
862   auto md = std::make_shared<KeyValueMetadata>();
863 
864   auto push = [&](std::string k, const Aws::String& v) {
865     if (!v.empty()) {
866       md->Append(std::move(k), FromAwsString(v).to_string());
867     }
868   };
869   auto push_datetime = [&](std::string k, const Aws::Utils::DateTime& v) {
870     if (v != Aws::Utils::DateTime(0.0)) {
871       push(std::move(k), v.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
872     }
873   };
874 
875   md->Append("Content-Length", std::to_string(result.GetContentLength()));
876   push("Cache-Control", result.GetCacheControl());
877   push("Content-Type", result.GetContentType());
878   push("Content-Language", result.GetContentLanguage());
879   push("ETag", result.GetETag());
880   push("VersionId", result.GetVersionId());
881   push_datetime("Last-Modified", result.GetLastModified());
882   push_datetime("Expires", result.GetExpires());
883   // NOTE the "canned ACL" isn't available for reading (one can get an expanded
884   // ACL using a separate GetObjectAcl request)
885   return md;
886 }
887 
888 template <typename ObjectRequest>
889 struct ObjectMetadataSetter {
890   using Setter = std::function<Status(const std::string& value, ObjectRequest* req)>;
891 
GetSettersarrow::fs::__anon5c130cae0311::ObjectMetadataSetter892   static std::unordered_map<std::string, Setter> GetSetters() {
893     return {{"ACL", CannedACLSetter()},
894             {"Cache-Control", StringSetter(&ObjectRequest::SetCacheControl)},
895             {"Content-Type", StringSetter(&ObjectRequest::SetContentType)},
896             {"Content-Language", StringSetter(&ObjectRequest::SetContentLanguage)},
897             {"Expires", DateTimeSetter(&ObjectRequest::SetExpires)}};
898   }
899 
900  private:
StringSetterarrow::fs::__anon5c130cae0311::ObjectMetadataSetter901   static Setter StringSetter(void (ObjectRequest::*req_method)(Aws::String&&)) {
902     return [req_method](const std::string& v, ObjectRequest* req) {
903       (req->*req_method)(ToAwsString(v));
904       return Status::OK();
905     };
906   }
907 
DateTimeSetterarrow::fs::__anon5c130cae0311::ObjectMetadataSetter908   static Setter DateTimeSetter(
909       void (ObjectRequest::*req_method)(Aws::Utils::DateTime&&)) {
910     return [req_method](const std::string& v, ObjectRequest* req) {
911       (req->*req_method)(
912           Aws::Utils::DateTime(v.data(), Aws::Utils::DateFormat::ISO_8601));
913       return Status::OK();
914     };
915   }
916 
CannedACLSetterarrow::fs::__anon5c130cae0311::ObjectMetadataSetter917   static Setter CannedACLSetter() {
918     return [](const std::string& v, ObjectRequest* req) {
919       ARROW_ASSIGN_OR_RAISE(auto acl, ParseACL(v));
920       req->SetACL(acl);
921       return Status::OK();
922     };
923   }
924 
ParseACLarrow::fs::__anon5c130cae0311::ObjectMetadataSetter925   static Result<S3Model::ObjectCannedACL> ParseACL(const std::string& v) {
926     if (v.empty()) {
927       return S3Model::ObjectCannedACL::NOT_SET;
928     }
929     auto acl = S3Model::ObjectCannedACLMapper::GetObjectCannedACLForName(ToAwsString(v));
930     if (acl == S3Model::ObjectCannedACL::NOT_SET) {
931       // XXX This actually never happens, as the AWS SDK dynamically
932       // expands the enum range using Aws::GetEnumOverflowContainer()
933       return Status::Invalid("Invalid S3 canned ACL: '", v, "'");
934     }
935     return acl;
936   }
937 };
938 
939 template <typename ObjectRequest>
SetObjectMetadata(const std::shared_ptr<const KeyValueMetadata> & metadata,ObjectRequest * req)940 Status SetObjectMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata,
941                          ObjectRequest* req) {
942   static auto setters = ObjectMetadataSetter<ObjectRequest>::GetSetters();
943 
944   DCHECK_NE(metadata, nullptr);
945   const auto& keys = metadata->keys();
946   const auto& values = metadata->values();
947 
948   for (size_t i = 0; i < keys.size(); ++i) {
949     auto it = setters.find(keys[i]);
950     if (it != setters.end()) {
951       RETURN_NOT_OK(it->second(values[i], req));
952     }
953   }
954   return Status::OK();
955 }
956 
957 // A RandomAccessFile that reads from a S3 object
958 class ObjectInputFile final : public io::RandomAccessFile {
959  public:
ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,const io::IOContext & io_context,const S3Path & path,int64_t size=kNoSize)960   ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,
961                   const io::IOContext& io_context, const S3Path& path,
962                   int64_t size = kNoSize)
963       : client_(std::move(client)),
964         io_context_(io_context),
965         path_(path),
966         content_length_(size) {}
967 
Init()968   Status Init() {
969     // Issue a HEAD Object to get the content-length and ensure any
970     // errors (e.g. file not found) don't wait until the first Read() call.
971     if (content_length_ != kNoSize) {
972       DCHECK_GE(content_length_, 0);
973       return Status::OK();
974     }
975 
976     S3Model::HeadObjectRequest req;
977     req.SetBucket(ToAwsString(path_.bucket));
978     req.SetKey(ToAwsString(path_.key));
979 
980     auto outcome = client_->HeadObject(req);
981     if (!outcome.IsSuccess()) {
982       if (IsNotFound(outcome.GetError())) {
983         return PathNotFound(path_);
984       } else {
985         return ErrorToStatus(
986             std::forward_as_tuple("When reading information for key '", path_.key,
987                                   "' in bucket '", path_.bucket, "': "),
988             outcome.GetError());
989       }
990     }
991     content_length_ = outcome.GetResult().GetContentLength();
992     DCHECK_GE(content_length_, 0);
993     metadata_ = GetObjectMetadata(outcome.GetResult());
994     return Status::OK();
995   }
996 
CheckClosed() const997   Status CheckClosed() const {
998     if (closed_) {
999       return Status::Invalid("Operation on closed stream");
1000     }
1001     return Status::OK();
1002   }
1003 
CheckPosition(int64_t position,const char * action) const1004   Status CheckPosition(int64_t position, const char* action) const {
1005     if (position < 0) {
1006       return Status::Invalid("Cannot ", action, " from negative position");
1007     }
1008     if (position > content_length_) {
1009       return Status::IOError("Cannot ", action, " past end of file");
1010     }
1011     return Status::OK();
1012   }
1013 
1014   // RandomAccessFile APIs
1015 
ReadMetadata()1016   Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
1017     return metadata_;
1018   }
1019 
ReadMetadataAsync(const io::IOContext & io_context)1020   Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
1021       const io::IOContext& io_context) override {
1022     return metadata_;
1023   }
1024 
Close()1025   Status Close() override {
1026     client_ = nullptr;
1027     closed_ = true;
1028     return Status::OK();
1029   }
1030 
closed() const1031   bool closed() const override { return closed_; }
1032 
Tell() const1033   Result<int64_t> Tell() const override {
1034     RETURN_NOT_OK(CheckClosed());
1035     return pos_;
1036   }
1037 
GetSize()1038   Result<int64_t> GetSize() override {
1039     RETURN_NOT_OK(CheckClosed());
1040     return content_length_;
1041   }
1042 
Seek(int64_t position)1043   Status Seek(int64_t position) override {
1044     RETURN_NOT_OK(CheckClosed());
1045     RETURN_NOT_OK(CheckPosition(position, "seek"));
1046 
1047     pos_ = position;
1048     return Status::OK();
1049   }
1050 
ReadAt(int64_t position,int64_t nbytes,void * out)1051   Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
1052     RETURN_NOT_OK(CheckClosed());
1053     RETURN_NOT_OK(CheckPosition(position, "read"));
1054 
1055     nbytes = std::min(nbytes, content_length_ - position);
1056     if (nbytes == 0) {
1057       return 0;
1058     }
1059 
1060     // Read the desired range of bytes
1061     ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
1062                           GetObjectRange(client_.get(), path_, position, nbytes, out));
1063 
1064     auto& stream = result.GetBody();
1065     stream.ignore(nbytes);
1066     // NOTE: the stream is a stringstream by default, there is no actual error
1067     // to check for.  However, stream.fail() may return true if EOF is reached.
1068     return stream.gcount();
1069   }
1070 
ReadAt(int64_t position,int64_t nbytes)1071   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
1072     RETURN_NOT_OK(CheckClosed());
1073     RETURN_NOT_OK(CheckPosition(position, "read"));
1074 
1075     // No need to allocate more than the remaining number of bytes
1076     nbytes = std::min(nbytes, content_length_ - position);
1077 
1078     ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, io_context_.pool()));
1079     if (nbytes > 0) {
1080       ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
1081                             ReadAt(position, nbytes, buf->mutable_data()));
1082       DCHECK_LE(bytes_read, nbytes);
1083       RETURN_NOT_OK(buf->Resize(bytes_read));
1084     }
1085     return std::move(buf);
1086   }
1087 
Read(int64_t nbytes,void * out)1088   Result<int64_t> Read(int64_t nbytes, void* out) override {
1089     ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
1090     pos_ += bytes_read;
1091     return bytes_read;
1092   }
1093 
Read(int64_t nbytes)1094   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
1095     ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
1096     pos_ += buffer->size();
1097     return std::move(buffer);
1098   }
1099 
1100  protected:
1101   std::shared_ptr<Aws::S3::S3Client> client_;
1102   const io::IOContext io_context_;
1103   S3Path path_;
1104 
1105   bool closed_ = false;
1106   int64_t pos_ = 0;
1107   int64_t content_length_ = kNoSize;
1108   std::shared_ptr<const KeyValueMetadata> metadata_;
1109 };
1110 
1111 // Minimum size for each part of a multipart upload, except for the last part.
1112 // AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
1113 // so I chose the safer value.
1114 // (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
1115 static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
1116 
1117 // An OutputStream that writes to a S3 object
1118 class ObjectOutputStream final : public io::OutputStream {
1119  protected:
1120   struct UploadState;
1121 
1122  public:
ObjectOutputStream(std::shared_ptr<S3Client> client,const io::IOContext & io_context,const S3Path & path,const S3Options & options,const std::shared_ptr<const KeyValueMetadata> & metadata)1123   ObjectOutputStream(std::shared_ptr<S3Client> client, const io::IOContext& io_context,
1124                      const S3Path& path, const S3Options& options,
1125                      const std::shared_ptr<const KeyValueMetadata>& metadata)
1126       : client_(std::move(client)),
1127         io_context_(io_context),
1128         path_(path),
1129         metadata_(metadata),
1130         default_metadata_(options.default_metadata),
1131         background_writes_(options.background_writes) {}
1132 
~ObjectOutputStream()1133   ~ObjectOutputStream() override {
1134     // For compliance with the rest of the IO stack, Close rather than Abort,
1135     // even though it may be more expensive.
1136     io::internal::CloseFromDestructor(this);
1137   }
1138 
Init()1139   Status Init() {
1140     // Initiate the multi-part upload
1141     S3Model::CreateMultipartUploadRequest req;
1142     req.SetBucket(ToAwsString(path_.bucket));
1143     req.SetKey(ToAwsString(path_.key));
1144     if (metadata_ && metadata_->size() != 0) {
1145       RETURN_NOT_OK(SetObjectMetadata(metadata_, &req));
1146     } else if (default_metadata_ && default_metadata_->size() != 0) {
1147       RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req));
1148     }
1149 
1150     auto outcome = client_->CreateMultipartUpload(req);
1151     if (!outcome.IsSuccess()) {
1152       return ErrorToStatus(
1153           std::forward_as_tuple("When initiating multiple part upload for key '",
1154                                 path_.key, "' in bucket '", path_.bucket, "': "),
1155           outcome.GetError());
1156     }
1157     upload_id_ = outcome.GetResult().GetUploadId();
1158     upload_state_ = std::make_shared<UploadState>();
1159     closed_ = false;
1160     return Status::OK();
1161   }
1162 
Abort()1163   Status Abort() override {
1164     if (closed_) {
1165       return Status::OK();
1166     }
1167 
1168     S3Model::AbortMultipartUploadRequest req;
1169     req.SetBucket(ToAwsString(path_.bucket));
1170     req.SetKey(ToAwsString(path_.key));
1171     req.SetUploadId(upload_id_);
1172 
1173     auto outcome = client_->AbortMultipartUpload(req);
1174     if (!outcome.IsSuccess()) {
1175       return ErrorToStatus(
1176           std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
1177                                 "' in bucket '", path_.bucket, "': "),
1178           outcome.GetError());
1179     }
1180     current_part_.reset();
1181     client_ = nullptr;
1182     closed_ = true;
1183     return Status::OK();
1184   }
1185 
1186   // OutputStream interface
1187 
Close()1188   Status Close() override {
1189     if (closed_) {
1190       return Status::OK();
1191     }
1192 
1193     if (current_part_) {
1194       // Upload last part
1195       RETURN_NOT_OK(CommitCurrentPart());
1196     }
1197 
1198     // S3 mandates at least one part, upload an empty one if necessary
1199     if (part_number_ == 1) {
1200       RETURN_NOT_OK(UploadPart("", 0));
1201     }
1202 
1203     // Wait for in-progress uploads to finish (if async writes are enabled)
1204     RETURN_NOT_OK(Flush());
1205 
1206     // At this point, all part uploads have finished successfully
1207     DCHECK_GT(part_number_, 1);
1208     DCHECK_EQ(upload_state_->completed_parts.size(),
1209               static_cast<size_t>(part_number_ - 1));
1210 
1211     S3Model::CompletedMultipartUpload completed_upload;
1212     completed_upload.SetParts(upload_state_->completed_parts);
1213     S3Model::CompleteMultipartUploadRequest req;
1214     req.SetBucket(ToAwsString(path_.bucket));
1215     req.SetKey(ToAwsString(path_.key));
1216     req.SetUploadId(upload_id_);
1217     req.SetMultipartUpload(std::move(completed_upload));
1218 
1219     auto outcome = client_->CompleteMultipartUploadWithErrorFixup(std::move(req));
1220     if (!outcome.IsSuccess()) {
1221       return ErrorToStatus(
1222           std::forward_as_tuple("When completing multiple part upload for key '",
1223                                 path_.key, "' in bucket '", path_.bucket, "': "),
1224           outcome.GetError());
1225     }
1226 
1227     client_ = nullptr;
1228     closed_ = true;
1229     return Status::OK();
1230   }
1231 
closed() const1232   bool closed() const override { return closed_; }
1233 
Tell() const1234   Result<int64_t> Tell() const override {
1235     if (closed_) {
1236       return Status::Invalid("Operation on closed stream");
1237     }
1238     return pos_;
1239   }
1240 
Write(const std::shared_ptr<Buffer> & buffer)1241   Status Write(const std::shared_ptr<Buffer>& buffer) override {
1242     return DoWrite(buffer->data(), buffer->size(), buffer);
1243   }
1244 
Write(const void * data,int64_t nbytes)1245   Status Write(const void* data, int64_t nbytes) override {
1246     return DoWrite(data, nbytes);
1247   }
1248 
DoWrite(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)1249   Status DoWrite(const void* data, int64_t nbytes,
1250                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
1251     if (closed_) {
1252       return Status::Invalid("Operation on closed stream");
1253     }
1254 
1255     if (!current_part_ && nbytes >= part_upload_threshold_) {
1256       // No current part and data large enough, upload it directly
1257       // (without copying if the buffer is owned)
1258       RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
1259       pos_ += nbytes;
1260       return Status::OK();
1261     }
1262     // Can't upload data on its own, need to buffer it
1263     if (!current_part_) {
1264       ARROW_ASSIGN_OR_RAISE(
1265           current_part_,
1266           io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool()));
1267       current_part_size_ = 0;
1268     }
1269     RETURN_NOT_OK(current_part_->Write(data, nbytes));
1270     pos_ += nbytes;
1271     current_part_size_ += nbytes;
1272 
1273     if (current_part_size_ >= part_upload_threshold_) {
1274       // Current part large enough, upload it
1275       RETURN_NOT_OK(CommitCurrentPart());
1276     }
1277 
1278     return Status::OK();
1279   }
1280 
Flush()1281   Status Flush() override {
1282     if (closed_) {
1283       return Status::Invalid("Operation on closed stream");
1284     }
1285     // Wait for background writes to finish
1286     std::unique_lock<std::mutex> lock(upload_state_->mutex);
1287     upload_state_->cv.wait(lock,
1288                            [this]() { return upload_state_->parts_in_progress == 0; });
1289     return upload_state_->status;
1290   }
1291 
1292   // Upload-related helpers
1293 
CommitCurrentPart()1294   Status CommitCurrentPart() {
1295     ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
1296     current_part_.reset();
1297     current_part_size_ = 0;
1298     return UploadPart(buf);
1299   }
1300 
UploadPart(std::shared_ptr<Buffer> buffer)1301   Status UploadPart(std::shared_ptr<Buffer> buffer) {
1302     return UploadPart(buffer->data(), buffer->size(), buffer);
1303   }
1304 
UploadPart(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)1305   Status UploadPart(const void* data, int64_t nbytes,
1306                     std::shared_ptr<Buffer> owned_buffer = nullptr) {
1307     S3Model::UploadPartRequest req;
1308     req.SetBucket(ToAwsString(path_.bucket));
1309     req.SetKey(ToAwsString(path_.key));
1310     req.SetUploadId(upload_id_);
1311     req.SetPartNumber(part_number_);
1312     req.SetContentLength(nbytes);
1313 
1314     if (!background_writes_) {
1315       req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
1316       auto outcome = client_->UploadPart(req);
1317       if (!outcome.IsSuccess()) {
1318         return UploadPartError(req, outcome);
1319       } else {
1320         AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
1321       }
1322     } else {
1323       // If the data isn't owned, make an immutable copy for the lifetime of the closure
1324       if (owned_buffer == nullptr) {
1325         ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
1326         memcpy(owned_buffer->mutable_data(), data, nbytes);
1327       } else {
1328         DCHECK_EQ(data, owned_buffer->data());
1329         DCHECK_EQ(nbytes, owned_buffer->size());
1330       }
1331       req.SetBody(
1332           std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
1333 
1334       {
1335         std::unique_lock<std::mutex> lock(upload_state_->mutex);
1336         ++upload_state_->parts_in_progress;
1337       }
1338       auto client = client_;
1339       ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
1340                               return client->UploadPart(req);
1341                             }));
1342       // The closure keeps the buffer and the upload state alive
1343       auto state = upload_state_;
1344       auto part_number = part_number_;
1345       auto handler = [owned_buffer, state, part_number,
1346                       req](const Result<S3Model::UploadPartOutcome>& result) -> void {
1347         HandleUploadOutcome(state, part_number, req, result);
1348       };
1349       fut.AddCallback(std::move(handler));
1350     }
1351 
1352     ++part_number_;
1353     // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
1354     // of exactly 5MB would be limited to 50GB total.  To avoid that, we bump
1355     // the upload threshold every 100 parts.  So the pattern is:
1356     // - part 1 to 99: 5MB threshold
1357     // - part 100 to 199: 10MB threshold
1358     // - part 200 to 299: 15MB threshold
1359     // ...
1360     // - part 9900 to 9999: 500MB threshold
1361     // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
1362     // chunk sizes and avoiding too much buffering in the common case of a small-ish
1363     // stream.  If the limit's not enough, we can revisit.
1364     if (part_number_ % 100 == 0) {
1365       part_upload_threshold_ += kMinimumPartUpload;
1366     }
1367 
1368     return Status::OK();
1369   }
1370 
HandleUploadOutcome(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartRequest & req,const Result<S3Model::UploadPartOutcome> & result)1371   static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
1372                                   int part_number, const S3Model::UploadPartRequest& req,
1373                                   const Result<S3Model::UploadPartOutcome>& result) {
1374     std::unique_lock<std::mutex> lock(state->mutex);
1375     if (!result.ok()) {
1376       state->status &= result.status();
1377     } else {
1378       const auto& outcome = *result;
1379       if (!outcome.IsSuccess()) {
1380         state->status &= UploadPartError(req, outcome);
1381       } else {
1382         AddCompletedPart(state, part_number, outcome.GetResult());
1383       }
1384     }
1385     // Notify completion
1386     if (--state->parts_in_progress == 0) {
1387       state->cv.notify_all();
1388     }
1389   }
1390 
AddCompletedPart(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartResult & result)1391   static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number,
1392                                const S3Model::UploadPartResult& result) {
1393     S3Model::CompletedPart part;
1394     // Append ETag and part number for this uploaded part
1395     // (will be needed for upload completion in Close())
1396     part.SetPartNumber(part_number);
1397     part.SetETag(result.GetETag());
1398     int slot = part_number - 1;
1399     if (state->completed_parts.size() <= static_cast<size_t>(slot)) {
1400       state->completed_parts.resize(slot + 1);
1401     }
1402     DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet());
1403     state->completed_parts[slot] = std::move(part);
1404   }
1405 
UploadPartError(const S3Model::UploadPartRequest & req,const S3Model::UploadPartOutcome & outcome)1406   static Status UploadPartError(const S3Model::UploadPartRequest& req,
1407                                 const S3Model::UploadPartOutcome& outcome) {
1408     return ErrorToStatus(
1409         std::forward_as_tuple("When uploading part for key '", req.GetKey(),
1410                               "' in bucket '", req.GetBucket(), "': "),
1411         outcome.GetError());
1412   }
1413 
1414  protected:
1415   std::shared_ptr<S3Client> client_;
1416   const io::IOContext io_context_;
1417   const S3Path path_;
1418   const std::shared_ptr<const KeyValueMetadata> metadata_;
1419   const std::shared_ptr<const KeyValueMetadata> default_metadata_;
1420   const bool background_writes_;
1421 
1422   Aws::String upload_id_;
1423   bool closed_ = true;
1424   int64_t pos_ = 0;
1425   int32_t part_number_ = 1;
1426   std::shared_ptr<io::BufferOutputStream> current_part_;
1427   int64_t current_part_size_ = 0;
1428   int64_t part_upload_threshold_ = kMinimumPartUpload;
1429 
1430   // This struct is kept alive through background writes to avoid problems
1431   // in the completion handler.
1432   struct UploadState {
1433     std::mutex mutex;
1434     std::condition_variable cv;
1435     Aws::Vector<S3Model::CompletedPart> completed_parts;
1436     int64_t parts_in_progress = 0;
1437     Status status;
1438   };
1439   std::shared_ptr<UploadState> upload_state_;
1440 };
1441 
1442 // This function assumes info->path() is already set
FileObjectToInfo(const S3Model::HeadObjectResult & obj,FileInfo * info)1443 void FileObjectToInfo(const S3Model::HeadObjectResult& obj, FileInfo* info) {
1444   info->set_type(FileType::File);
1445   info->set_size(static_cast<int64_t>(obj.GetContentLength()));
1446   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1447 }
1448 
FileObjectToInfo(const S3Model::Object & obj,FileInfo * info)1449 void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
1450   info->set_type(FileType::File);
1451   info->set_size(static_cast<int64_t>(obj.GetSize()));
1452   info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1453 }
1454 
1455 struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
1456   using ResultHandler = std::function<Status(const std::string& prefix,
1457                                              const S3Model::ListObjectsV2Result&)>;
1458   using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
1459   using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
1460 
1461   std::shared_ptr<Aws::S3::S3Client> client_;
1462   io::IOContext io_context_;
1463   const std::string bucket_;
1464   const std::string base_dir_;
1465   const int32_t max_keys_;
1466   const ResultHandler result_handler_;
1467   const ErrorHandler error_handler_;
1468   const RecursionHandler recursion_handler_;
1469 
1470   template <typename... Args>
Walkarrow::fs::__anon5c130cae0311::TreeWalker1471   static Status Walk(Args&&... args) {
1472     return WalkAsync(std::forward<Args>(args)...).status();
1473   }
1474 
1475   template <typename... Args>
WalkAsyncarrow::fs::__anon5c130cae0311::TreeWalker1476   static Future<> WalkAsync(Args&&... args) {
1477     auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
1478     return self->DoWalk();
1479   }
1480 
TreeWalkerarrow::fs::__anon5c130cae0311::TreeWalker1481   TreeWalker(std::shared_ptr<Aws::S3::S3Client> client, io::IOContext io_context,
1482              std::string bucket, std::string base_dir, int32_t max_keys,
1483              ResultHandler result_handler, ErrorHandler error_handler,
1484              RecursionHandler recursion_handler)
1485       : client_(std::move(client)),
1486         io_context_(io_context),
1487         bucket_(std::move(bucket)),
1488         base_dir_(std::move(base_dir)),
1489         max_keys_(max_keys),
1490         result_handler_(std::move(result_handler)),
1491         error_handler_(std::move(error_handler)),
1492         recursion_handler_(std::move(recursion_handler)) {}
1493 
1494  private:
1495   std::shared_ptr<TaskGroup> task_group_;
1496   std::mutex mutex_;
1497 
DoWalkarrow::fs::__anon5c130cae0311::TreeWalker1498   Future<> DoWalk() {
1499     task_group_ =
1500         TaskGroup::MakeThreaded(io_context_.executor(), io_context_.stop_token());
1501     WalkChild(base_dir_, /*nesting_depth=*/0);
1502     // When this returns, ListObjectsV2 tasks either have finished or will exit early
1503     return task_group_->FinishAsync();
1504   }
1505 
okarrow::fs::__anon5c130cae0311::TreeWalker1506   bool ok() const { return task_group_->ok(); }
1507 
1508   struct ListObjectsV2Handler {
1509     std::shared_ptr<TreeWalker> walker;
1510     std::string prefix;
1511     int32_t nesting_depth;
1512     S3Model::ListObjectsV2Request req;
1513 
operator ()arrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1514     Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
1515       // Serialize calls to operation-specific handlers
1516       if (!walker->ok()) {
1517         // Early exit: avoid executing handlers if DoWalk() returned
1518         return Status::OK();
1519       }
1520       if (!result.ok()) {
1521         return result.status();
1522       }
1523       const auto& outcome = *result;
1524       if (!outcome.IsSuccess()) {
1525         {
1526           std::lock_guard<std::mutex> guard(walker->mutex_);
1527           return walker->error_handler_(outcome.GetError());
1528         }
1529       }
1530       return HandleResult(outcome.GetResult());
1531     }
1532 
SpawnListObjectsV2arrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1533     void SpawnListObjectsV2() {
1534       auto cb = *this;
1535       walker->task_group_->Append([cb]() mutable {
1536         Result<S3Model::ListObjectsV2Outcome> result =
1537             cb.walker->client_->ListObjectsV2(cb.req);
1538         return cb(result);
1539       });
1540     }
1541 
HandleResultarrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1542     Status HandleResult(const S3Model::ListObjectsV2Result& result) {
1543       bool recurse;
1544       {
1545         // Only one thread should be running result_handler_/recursion_handler_ at a time
1546         std::lock_guard<std::mutex> guard(walker->mutex_);
1547         recurse = result.GetCommonPrefixes().size() > 0;
1548         if (recurse) {
1549           ARROW_ASSIGN_OR_RAISE(auto maybe_recurse,
1550                                 walker->recursion_handler_(nesting_depth + 1));
1551           recurse &= maybe_recurse;
1552         }
1553         RETURN_NOT_OK(walker->result_handler_(prefix, result));
1554       }
1555       if (recurse) {
1556         walker->WalkChildren(result, nesting_depth + 1);
1557       }
1558       // If the result was truncated, issue a continuation request to get
1559       // further directory entries.
1560       if (result.GetIsTruncated()) {
1561         DCHECK(!result.GetNextContinuationToken().empty());
1562         req.SetContinuationToken(result.GetNextContinuationToken());
1563         SpawnListObjectsV2();
1564       }
1565       return Status::OK();
1566     }
1567 
Startarrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1568     void Start() {
1569       req.SetBucket(ToAwsString(walker->bucket_));
1570       if (!prefix.empty()) {
1571         req.SetPrefix(ToAwsString(prefix) + kSep);
1572       }
1573       req.SetDelimiter(Aws::String() + kSep);
1574       req.SetMaxKeys(walker->max_keys_);
1575       SpawnListObjectsV2();
1576     }
1577   };
1578 
WalkChildarrow::fs::__anon5c130cae0311::TreeWalker1579   void WalkChild(std::string key, int32_t nesting_depth) {
1580     ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
1581     handler.Start();
1582   }
1583 
WalkChildrenarrow::fs::__anon5c130cae0311::TreeWalker1584   void WalkChildren(const S3Model::ListObjectsV2Result& result, int32_t nesting_depth) {
1585     for (const auto& prefix : result.GetCommonPrefixes()) {
1586       const auto child_key =
1587           internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1588       WalkChild(std::string{child_key}, nesting_depth);
1589     }
1590   }
1591 
1592   friend struct ListObjectsV2Handler;
1593 };
1594 
1595 }  // namespace
1596 
1597 // -----------------------------------------------------------------------
1598 // S3 filesystem implementation
1599 
1600 class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Impl> {
1601  public:
1602   ClientBuilder builder_;
1603   io::IOContext io_context_;
1604   std::shared_ptr<S3Client> client_;
1605   util::optional<S3Backend> backend_;
1606 
1607   const int32_t kListObjectsMaxKeys = 1000;
1608   // At most 1000 keys per multiple-delete request
1609   const int32_t kMultipleDeleteMaxKeys = 1000;
1610   // Limit recursing depth, since a recursion bomb can be created
1611   const int32_t kMaxNestingDepth = 100;
1612 
Impl(S3Options options,io::IOContext io_context)1613   explicit Impl(S3Options options, io::IOContext io_context)
1614       : builder_(std::move(options)), io_context_(io_context) {}
1615 
Init()1616   Status Init() { return builder_.BuildClient().Value(&client_); }
1617 
options() const1618   const S3Options& options() const { return builder_.options(); }
1619 
region() const1620   std::string region() const {
1621     return std::string(FromAwsString(builder_.config().region));
1622   }
1623 
1624   template <typename Error>
SaveBackend(const Aws::Client::AWSError<Error> & error)1625   void SaveBackend(const Aws::Client::AWSError<Error>& error) {
1626     if (!backend_ || *backend_ == S3Backend::Other) {
1627       backend_ = DetectS3Backend(error);
1628     }
1629   }
1630 
1631   // Tests to see if a bucket exists
BucketExists(const std::string & bucket)1632   Result<bool> BucketExists(const std::string& bucket) {
1633     S3Model::HeadBucketRequest req;
1634     req.SetBucket(ToAwsString(bucket));
1635 
1636     auto outcome = client_->HeadBucket(req);
1637     if (!outcome.IsSuccess()) {
1638       if (!IsNotFound(outcome.GetError())) {
1639         return ErrorToStatus(std::forward_as_tuple(
1640                                  "When testing for existence of bucket '", bucket, "': "),
1641                              outcome.GetError());
1642       }
1643       return false;
1644     }
1645     return true;
1646   }
1647 
1648   // Create a bucket.  Successful if bucket already exists.
CreateBucket(const std::string & bucket)1649   Status CreateBucket(const std::string& bucket) {
1650     S3Model::CreateBucketConfiguration config;
1651     S3Model::CreateBucketRequest req;
1652     auto _region = region();
1653     // AWS S3 treats the us-east-1 differently than other regions
1654     // https://docs.aws.amazon.com/cli/latest/reference/s3api/create-bucket.html
1655     if (_region != "us-east-1") {
1656       config.SetLocationConstraint(
1657           S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
1658               ToAwsString(_region)));
1659     }
1660     req.SetBucket(ToAwsString(bucket));
1661     req.SetCreateBucketConfiguration(config);
1662 
1663     auto outcome = client_->CreateBucket(req);
1664     if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
1665       return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
1666                            outcome.GetError());
1667     }
1668     return Status::OK();
1669   }
1670 
1671   // Create an object with empty contents.  Successful if object already exists.
CreateEmptyObject(const std::string & bucket,const std::string & key)1672   Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
1673     S3Model::PutObjectRequest req;
1674     req.SetBucket(ToAwsString(bucket));
1675     req.SetKey(ToAwsString(key));
1676     return OutcomeToStatus(
1677         std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
1678         client_->PutObject(req));
1679   }
1680 
CreateEmptyDir(const std::string & bucket,const std::string & key)1681   Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
1682     DCHECK(!key.empty());
1683     return CreateEmptyObject(bucket, key + kSep);
1684   }
1685 
DeleteObject(const std::string & bucket,const std::string & key)1686   Status DeleteObject(const std::string& bucket, const std::string& key) {
1687     S3Model::DeleteObjectRequest req;
1688     req.SetBucket(ToAwsString(bucket));
1689     req.SetKey(ToAwsString(key));
1690     return OutcomeToStatus(
1691         std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
1692         client_->DeleteObject(req));
1693   }
1694 
CopyObject(const S3Path & src_path,const S3Path & dest_path)1695   Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
1696     S3Model::CopyObjectRequest req;
1697     req.SetBucket(ToAwsString(dest_path.bucket));
1698     req.SetKey(ToAwsString(dest_path.key));
1699     // ARROW-13048: Copy source "Must be URL-encoded" according to AWS SDK docs.
1700     // However at least in 1.8 and 1.9 the SDK URL-encodes the path for you
1701     req.SetCopySource(src_path.ToAwsString());
1702     return OutcomeToStatus(
1703         std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
1704                               src_path.bucket, "' to key '", dest_path.key,
1705                               "' in bucket '", dest_path.bucket, "': "),
1706         client_->CopyObject(req));
1707   }
1708 
1709   // On Minio, an empty "directory" doesn't satisfy the same API requests as
1710   // a non-empty "directory".  This is a Minio-specific quirk, but we need
1711   // to handle it for unit testing.
1712 
IsEmptyDirectory(const std::string & bucket,const std::string & key,bool * out)1713   Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) {
1714     S3Model::HeadObjectRequest req;
1715     req.SetBucket(ToAwsString(bucket));
1716     if (backend_ && *backend_ == S3Backend::Minio) {
1717       // Minio wants a slash at the end, Amazon doesn't
1718       req.SetKey(ToAwsString(key) + kSep);
1719     } else {
1720       req.SetKey(ToAwsString(key));
1721     }
1722 
1723     auto outcome = client_->HeadObject(req);
1724     if (outcome.IsSuccess()) {
1725       *out = true;
1726       return Status::OK();
1727     }
1728     if (!backend_) {
1729       SaveBackend(outcome.GetError());
1730       DCHECK(backend_);
1731       if (*backend_ == S3Backend::Minio) {
1732         // Try again with separator-terminated key (see above)
1733         return IsEmptyDirectory(bucket, key, out);
1734       }
1735     }
1736     if (IsNotFound(outcome.GetError())) {
1737       *out = false;
1738       return Status::OK();
1739     }
1740     return ErrorToStatus(std::forward_as_tuple("When reading information for key '", key,
1741                                                "' in bucket '", bucket, "': "),
1742                          outcome.GetError());
1743   }
1744 
IsEmptyDirectory(const S3Path & path,bool * out)1745   Status IsEmptyDirectory(const S3Path& path, bool* out) {
1746     return IsEmptyDirectory(path.bucket, path.key, out);
1747   }
1748 
IsNonEmptyDirectory(const S3Path & path,bool * out)1749   Status IsNonEmptyDirectory(const S3Path& path, bool* out) {
1750     S3Model::ListObjectsV2Request req;
1751     req.SetBucket(ToAwsString(path.bucket));
1752     req.SetPrefix(ToAwsString(path.key) + kSep);
1753     req.SetDelimiter(Aws::String() + kSep);
1754     req.SetMaxKeys(1);
1755     auto outcome = client_->ListObjectsV2(req);
1756     if (outcome.IsSuccess()) {
1757       *out = outcome.GetResult().GetKeyCount() > 0;
1758       return Status::OK();
1759     }
1760     if (IsNotFound(outcome.GetError())) {
1761       *out = false;
1762       return Status::OK();
1763     }
1764     return ErrorToStatus(
1765         std::forward_as_tuple("When listing objects under key '", path.key,
1766                               "' in bucket '", path.bucket, "': "),
1767         outcome.GetError());
1768   }
1769 
CheckNestingDepth(int32_t nesting_depth)1770   Status CheckNestingDepth(int32_t nesting_depth) {
1771     if (nesting_depth >= kMaxNestingDepth) {
1772       return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1773                              kMaxNestingDepth, ")");
1774     }
1775     return Status::OK();
1776   }
1777 
1778   // A helper class for Walk and WalkAsync
1779   struct FileInfoCollector {
FileInfoCollectorarrow::fs::S3FileSystem::Impl::FileInfoCollector1780     FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
1781         : bucket(std::move(bucket)),
1782           key(std::move(key)),
1783           allow_not_found(select.allow_not_found) {}
1784 
Collectarrow::fs::S3FileSystem::Impl::FileInfoCollector1785     Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
1786                    std::vector<FileInfo>* out) {
1787       // Walk "directories"
1788       for (const auto& child_prefix : result.GetCommonPrefixes()) {
1789         is_empty = false;
1790         const auto child_key =
1791             internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
1792         std::stringstream child_path;
1793         child_path << bucket << kSep << child_key;
1794         FileInfo info;
1795         info.set_path(child_path.str());
1796         info.set_type(FileType::Directory);
1797         out->push_back(std::move(info));
1798       }
1799       // Walk "files"
1800       for (const auto& obj : result.GetContents()) {
1801         is_empty = false;
1802         FileInfo info;
1803         const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
1804         if (child_key == util::string_view(prefix)) {
1805           // Amazon can return the "directory" key itself as part of the results, skip
1806           continue;
1807         }
1808         std::stringstream child_path;
1809         child_path << bucket << kSep << child_key;
1810         info.set_path(child_path.str());
1811         FileObjectToInfo(obj, &info);
1812         out->push_back(std::move(info));
1813       }
1814       return Status::OK();
1815     }
1816 
Finisharrow::fs::S3FileSystem::Impl::FileInfoCollector1817     Status Finish(Impl* impl) {
1818       // If no contents were found, perhaps it's an empty "directory",
1819       // or perhaps it's a nonexistent entry.  Check.
1820       if (is_empty && !allow_not_found) {
1821         bool is_actually_empty;
1822         RETURN_NOT_OK(impl->IsEmptyDirectory(bucket, key, &is_actually_empty));
1823         if (!is_actually_empty) {
1824           return PathNotFound(bucket, key);
1825         }
1826       }
1827       return Status::OK();
1828     }
1829 
1830     std::string bucket;
1831     std::string key;
1832     bool allow_not_found;
1833     bool is_empty = true;
1834   };
1835 
1836   // Workhorse for GetFileInfo(FileSelector...)
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,std::vector<FileInfo> * out)1837   Status Walk(const FileSelector& select, const std::string& bucket,
1838               const std::string& key, std::vector<FileInfo>* out) {
1839     FileInfoCollector collector(bucket, key, select);
1840 
1841     auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1842       if (select.allow_not_found && IsNotFound(error)) {
1843         return Status::OK();
1844       }
1845       return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1846                                                  "' in bucket '", bucket, "': "),
1847                            error);
1848     };
1849 
1850     auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1851       RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1852       return select.recursive && nesting_depth <= select.max_recursion;
1853     };
1854 
1855     auto handle_results = [&](const std::string& prefix,
1856                               const S3Model::ListObjectsV2Result& result) -> Status {
1857       return collector.Collect(prefix, result, out);
1858     };
1859 
1860     RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
1861                                    handle_results, handle_error, handle_recursion));
1862 
1863     // If no contents were found, perhaps it's an empty "directory",
1864     // or perhaps it's a nonexistent entry.  Check.
1865     RETURN_NOT_OK(collector.Finish(this));
1866     // Sort results for convenience, since they can come massively out of order
1867     std::sort(out->begin(), out->end(), FileInfo::ByPath{});
1868     return Status::OK();
1869   }
1870 
1871   // Workhorse for GetFileInfoGenerator(FileSelector...)
WalkAsync(const FileSelector & select,const std::string & bucket,const std::string & key)1872   FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& bucket,
1873                               const std::string& key) {
1874     PushGenerator<std::vector<FileInfo>> gen;
1875     auto producer = gen.producer();
1876     auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
1877     auto self = shared_from_this();
1878 
1879     auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) -> Status {
1880       if (select.allow_not_found && IsNotFound(error)) {
1881         return Status::OK();
1882       }
1883       return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1884                                                  "' in bucket '", bucket, "': "),
1885                            error);
1886     };
1887 
1888     auto handle_recursion = [producer, select,
1889                              self](int32_t nesting_depth) -> Result<bool> {
1890       if (producer.is_closed()) {
1891         return false;
1892       }
1893       RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
1894       return select.recursive && nesting_depth <= select.max_recursion;
1895     };
1896 
1897     auto handle_results =
1898         [collector, producer](
1899             const std::string& prefix,
1900             const S3Model::ListObjectsV2Result& result) mutable -> Status {
1901       std::vector<FileInfo> out;
1902       RETURN_NOT_OK(collector->Collect(prefix, result, &out));
1903       if (!out.empty()) {
1904         producer.Push(std::move(out));
1905       }
1906       return Status::OK();
1907     };
1908 
1909     TreeWalker::WalkAsync(client_, io_context_, bucket, key, kListObjectsMaxKeys,
1910                           handle_results, handle_error, handle_recursion)
1911         .AddCallback([collector, producer, self](const Status& status) mutable {
1912           auto st = collector->Finish(self.get());
1913           if (!st.ok()) {
1914             producer.Push(st);
1915           }
1916           producer.Close();
1917         });
1918     return gen;
1919   }
1920 
WalkForDeleteDir(const std::string & bucket,const std::string & key,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1921   Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1922                           std::vector<std::string>* file_keys,
1923                           std::vector<std::string>* dir_keys) {
1924     auto handle_results = [&](const std::string& prefix,
1925                               const S3Model::ListObjectsV2Result& result) -> Status {
1926       // Walk "files"
1927       file_keys->reserve(file_keys->size() + result.GetContents().size());
1928       for (const auto& obj : result.GetContents()) {
1929         file_keys->emplace_back(FromAwsString(obj.GetKey()));
1930       }
1931       // Walk "directories"
1932       dir_keys->reserve(dir_keys->size() + result.GetCommonPrefixes().size());
1933       for (const auto& prefix : result.GetCommonPrefixes()) {
1934         dir_keys->emplace_back(FromAwsString(prefix.GetPrefix()));
1935       }
1936       return Status::OK();
1937     };
1938 
1939     auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1940       return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1941                                                  "' in bucket '", bucket, "': "),
1942                            error);
1943     };
1944 
1945     auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1946       RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1947       return true;  // Recurse
1948     };
1949 
1950     return TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
1951                             handle_results, handle_error, handle_recursion);
1952   }
1953 
1954   // Delete multiple objects at once
DeleteObjectsAsync(const std::string & bucket,const std::vector<std::string> & keys)1955   Future<> DeleteObjectsAsync(const std::string& bucket,
1956                               const std::vector<std::string>& keys) {
1957     struct DeleteCallback {
1958       const std::string bucket;
1959 
1960       Status operator()(const S3Model::DeleteObjectsOutcome& outcome) {
1961         if (!outcome.IsSuccess()) {
1962           return ErrorToStatus(outcome.GetError());
1963         }
1964         // Also need to check per-key errors, even on successful outcome
1965         // See
1966         // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
1967         const auto& errors = outcome.GetResult().GetErrors();
1968         if (!errors.empty()) {
1969           std::stringstream ss;
1970           ss << "Got the following " << errors.size()
1971              << " errors when deleting objects in S3 bucket '" << bucket << "':\n";
1972           for (const auto& error : errors) {
1973             ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
1974           }
1975           return Status::IOError(ss.str());
1976         }
1977         return Status::OK();
1978       }
1979     };
1980 
1981     const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
1982     DeleteCallback delete_cb{bucket};
1983     auto client = client_;
1984 
1985     std::vector<Future<>> futures;
1986     futures.reserve(keys.size() / chunk_size + 1);
1987 
1988     for (size_t start = 0; start < keys.size(); start += chunk_size) {
1989       S3Model::DeleteObjectsRequest req;
1990       S3Model::Delete del;
1991       for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) {
1992         del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i])));
1993       }
1994       req.SetBucket(ToAwsString(bucket));
1995       req.SetDelete(std::move(del));
1996       ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
1997                               return client->DeleteObjects(req);
1998                             }));
1999       futures.push_back(std::move(fut).Then(delete_cb));
2000     }
2001 
2002     return AllComplete(futures);
2003   }
2004 
DeleteObjects(const std::string & bucket,const std::vector<std::string> & keys)2005   Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
2006     return DeleteObjectsAsync(bucket, keys).status();
2007   }
2008 
DeleteDirContents(const std::string & bucket,const std::string & key)2009   Status DeleteDirContents(const std::string& bucket, const std::string& key) {
2010     std::vector<std::string> file_keys;
2011     std::vector<std::string> dir_keys;
2012     RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys));
2013     if (file_keys.empty() && dir_keys.empty() && !key.empty()) {
2014       // No contents found, is it an empty directory?
2015       bool exists = false;
2016       RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists));
2017       if (!exists) {
2018         return PathNotFound(bucket, key);
2019       }
2020     }
2021     // First delete all "files", then delete all child "directories"
2022     RETURN_NOT_OK(DeleteObjects(bucket, file_keys));
2023     // Delete directories in reverse lexicographic order, to ensure children
2024     // are deleted before their parents (Minio).
2025     std::sort(dir_keys.rbegin(), dir_keys.rend());
2026     return DeleteObjects(bucket, dir_keys);
2027   }
2028 
EnsureDirectoryExists(const S3Path & path)2029   Status EnsureDirectoryExists(const S3Path& path) {
2030     if (!path.key.empty()) {
2031       return CreateEmptyDir(path.bucket, path.key);
2032     }
2033     return Status::OK();
2034   }
2035 
EnsureParentExists(const S3Path & path)2036   Status EnsureParentExists(const S3Path& path) {
2037     if (path.has_parent()) {
2038       return EnsureDirectoryExists(path.parent());
2039     }
2040     return Status::OK();
2041   }
2042 
ProcessListBuckets(const Aws::S3::Model::ListBucketsOutcome & outcome)2043   static Result<std::vector<std::string>> ProcessListBuckets(
2044       const Aws::S3::Model::ListBucketsOutcome& outcome) {
2045     if (!outcome.IsSuccess()) {
2046       return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
2047                            outcome.GetError());
2048     }
2049     std::vector<std::string> buckets;
2050     buckets.reserve(outcome.GetResult().GetBuckets().size());
2051     for (const auto& bucket : outcome.GetResult().GetBuckets()) {
2052       buckets.emplace_back(FromAwsString(bucket.GetName()));
2053     }
2054     return buckets;
2055   }
2056 
ListBuckets()2057   Result<std::vector<std::string>> ListBuckets() {
2058     auto outcome = client_->ListBuckets();
2059     return ProcessListBuckets(outcome);
2060   }
2061 
ListBucketsAsync(io::IOContext ctx)2062   Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
2063     auto self = shared_from_this();
2064     return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
2065         // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
2066         .Then([](const Aws::S3::Model::ListBucketsOutcome& outcome) {
2067           return Impl::ProcessListBuckets(outcome);
2068         });
2069   }
2070 
OpenInputFile(const std::string & s,S3FileSystem * fs)2071   Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
2072                                                          S3FileSystem* fs) {
2073     ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2074     RETURN_NOT_OK(ValidateFilePath(path));
2075 
2076     auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path);
2077     RETURN_NOT_OK(ptr->Init());
2078     return ptr;
2079   }
2080 
OpenInputFile(const FileInfo & info,S3FileSystem * fs)2081   Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
2082                                                          S3FileSystem* fs) {
2083     if (info.type() == FileType::NotFound) {
2084       return ::arrow::fs::internal::PathNotFound(info.path());
2085     }
2086     if (info.type() != FileType::File && info.type() != FileType::Unknown) {
2087       return ::arrow::fs::internal::NotAFile(info.path());
2088     }
2089 
2090     ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
2091     RETURN_NOT_OK(ValidateFilePath(path));
2092 
2093     auto ptr =
2094         std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size());
2095     RETURN_NOT_OK(ptr->Init());
2096     return ptr;
2097   }
2098 };
2099 
S3FileSystem(const S3Options & options,const io::IOContext & io_context)2100 S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& io_context)
2101     : FileSystem(io_context), impl_(std::make_shared<Impl>(options, io_context)) {
2102   default_async_is_sync_ = false;
2103 }
2104 
~S3FileSystem()2105 S3FileSystem::~S3FileSystem() {}
2106 
Make(const S3Options & options,const io::IOContext & io_context)2107 Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
2108     const S3Options& options, const io::IOContext& io_context) {
2109   RETURN_NOT_OK(CheckS3Initialized());
2110 
2111   std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
2112   RETURN_NOT_OK(ptr->impl_->Init());
2113   return ptr;
2114 }
2115 
Equals(const FileSystem & other) const2116 bool S3FileSystem::Equals(const FileSystem& other) const {
2117   if (this == &other) {
2118     return true;
2119   }
2120   if (other.type_name() != type_name()) {
2121     return false;
2122   }
2123   const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
2124   return options().Equals(s3fs.options());
2125 }
2126 
options() const2127 S3Options S3FileSystem::options() const { return impl_->options(); }
2128 
region() const2129 std::string S3FileSystem::region() const { return impl_->region(); }
2130 
GetFileInfo(const std::string & s)2131 Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
2132   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2133   FileInfo info;
2134   info.set_path(s);
2135 
2136   if (path.empty()) {
2137     // It's the root path ""
2138     info.set_type(FileType::Directory);
2139     return info;
2140   } else if (path.key.empty()) {
2141     // It's a bucket
2142     S3Model::HeadBucketRequest req;
2143     req.SetBucket(ToAwsString(path.bucket));
2144 
2145     auto outcome = impl_->client_->HeadBucket(req);
2146     if (!outcome.IsSuccess()) {
2147       if (!IsNotFound(outcome.GetError())) {
2148         return ErrorToStatus(
2149             std::forward_as_tuple("When getting information for bucket '", path.bucket,
2150                                   "': "),
2151             outcome.GetError());
2152       }
2153       info.set_type(FileType::NotFound);
2154       return info;
2155     }
2156     // NOTE: S3 doesn't have a bucket modification time.  Only a creation
2157     // time is available, and you have to list all buckets to get it.
2158     info.set_type(FileType::Directory);
2159     return info;
2160   } else {
2161     // It's an object
2162     S3Model::HeadObjectRequest req;
2163     req.SetBucket(ToAwsString(path.bucket));
2164     req.SetKey(ToAwsString(path.key));
2165 
2166     auto outcome = impl_->client_->HeadObject(req);
2167     if (outcome.IsSuccess()) {
2168       // "File" object found
2169       FileObjectToInfo(outcome.GetResult(), &info);
2170       return info;
2171     }
2172     if (!IsNotFound(outcome.GetError())) {
2173       return ErrorToStatus(
2174           std::forward_as_tuple("When getting information for key '", path.key,
2175                                 "' in bucket '", path.bucket, "': "),
2176           outcome.GetError());
2177     }
2178     // Not found => perhaps it's an empty "directory"
2179     bool is_dir = false;
2180     RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir));
2181     if (is_dir) {
2182       info.set_type(FileType::Directory);
2183       return info;
2184     }
2185     // Not found => perhaps it's a non-empty "directory"
2186     RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir));
2187     if (is_dir) {
2188       info.set_type(FileType::Directory);
2189     } else {
2190       info.set_type(FileType::NotFound);
2191     }
2192     return info;
2193   }
2194 }
2195 
GetFileInfo(const FileSelector & select)2196 Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
2197   ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
2198 
2199   FileInfoVector results;
2200 
2201   if (base_path.empty()) {
2202     // List all buckets
2203     ARROW_ASSIGN_OR_RAISE(auto buckets, impl_->ListBuckets());
2204     for (const auto& bucket : buckets) {
2205       FileInfo info;
2206       info.set_path(bucket);
2207       info.set_type(FileType::Directory);
2208       results.push_back(std::move(info));
2209       if (select.recursive) {
2210         RETURN_NOT_OK(impl_->Walk(select, bucket, "", &results));
2211       }
2212     }
2213     return results;
2214   }
2215 
2216   // Nominal case -> walk a single bucket
2217   RETURN_NOT_OK(impl_->Walk(select, base_path.bucket, base_path.key, &results));
2218   return results;
2219 }
2220 
GetFileInfoGenerator(const FileSelector & select)2221 FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select) {
2222   auto maybe_base_path = S3Path::FromString(select.base_dir);
2223   if (!maybe_base_path.ok()) {
2224     return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
2225   }
2226   auto base_path = *std::move(maybe_base_path);
2227 
2228   if (base_path.empty()) {
2229     // List all buckets, then possibly recurse
2230     PushGenerator<AsyncGenerator<FileInfoVector>> gen;
2231     auto producer = gen.producer();
2232 
2233     auto fut = impl_->ListBucketsAsync(io_context());
2234     auto impl = impl_->shared_from_this();
2235     fut.AddCallback(
2236         [producer, select, impl](const Result<std::vector<std::string>>& res) mutable {
2237           if (!res.ok()) {
2238             producer.Push(res.status());
2239             producer.Close();
2240             return;
2241           }
2242           FileInfoVector buckets;
2243           for (const auto& bucket : *res) {
2244             buckets.push_back(FileInfo{bucket, FileType::Directory});
2245           }
2246           // Generate all bucket infos
2247           auto buckets_fut = Future<FileInfoVector>::MakeFinished(std::move(buckets));
2248           producer.Push(MakeSingleFutureGenerator(buckets_fut));
2249           if (select.recursive) {
2250             // Generate recursive walk for each bucket in turn
2251             for (const auto& bucket : *buckets_fut.result()) {
2252               producer.Push(impl->WalkAsync(select, bucket.path(), ""));
2253             }
2254           }
2255           producer.Close();
2256         });
2257 
2258     return MakeConcatenatedGenerator(
2259         AsyncGenerator<AsyncGenerator<FileInfoVector>>{std::move(gen)});
2260   }
2261 
2262   // Nominal case -> walk a single bucket
2263   return impl_->WalkAsync(select, base_path.bucket, base_path.key);
2264 }
2265 
CreateDir(const std::string & s,bool recursive)2266 Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
2267   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2268 
2269   if (path.key.empty()) {
2270     // Create bucket
2271     return impl_->CreateBucket(path.bucket);
2272   }
2273 
2274   // Create object
2275   if (recursive) {
2276     // Ensure bucket exists
2277     ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket));
2278     if (!bucket_exists) {
2279       RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
2280     }
2281     // Ensure that all parents exist, then the directory itself
2282     std::string parent_key;
2283     for (const auto& part : path.key_parts) {
2284       parent_key += part;
2285       parent_key += kSep;
2286       RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key));
2287     }
2288     return Status::OK();
2289   } else {
2290     // Check parent dir exists
2291     if (path.has_parent()) {
2292       S3Path parent_path = path.parent();
2293       bool exists;
2294       RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists));
2295       if (!exists) {
2296         RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists));
2297       }
2298       if (!exists) {
2299         return Status::IOError("Cannot create directory '", path.full_path,
2300                                "': parent directory does not exist");
2301       }
2302     }
2303 
2304     // XXX Should we check that no non-directory entry exists?
2305     // Minio does it for us, not sure about other S3 implementations.
2306     return impl_->CreateEmptyDir(path.bucket, path.key);
2307   }
2308 }
2309 
DeleteDir(const std::string & s)2310 Status S3FileSystem::DeleteDir(const std::string& s) {
2311   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2312 
2313   if (path.empty()) {
2314     return Status::NotImplemented("Cannot delete all S3 buckets");
2315   }
2316   RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
2317   if (path.key.empty()) {
2318     // Delete bucket
2319     S3Model::DeleteBucketRequest req;
2320     req.SetBucket(ToAwsString(path.bucket));
2321     return OutcomeToStatus(
2322         std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
2323         impl_->client_->DeleteBucket(req));
2324   } else {
2325     // Delete "directory"
2326     RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
2327     // Parent may be implicitly deleted if it became empty, recreate it
2328     return impl_->EnsureParentExists(path);
2329   }
2330 }
2331 
DeleteDirContents(const std::string & s)2332 Status S3FileSystem::DeleteDirContents(const std::string& s) {
2333   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2334 
2335   if (path.empty()) {
2336     return Status::NotImplemented("Cannot delete all S3 buckets");
2337   }
2338   RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
2339   // Directory may be implicitly deleted, recreate it
2340   return impl_->EnsureDirectoryExists(path);
2341 }
2342 
DeleteRootDirContents()2343 Status S3FileSystem::DeleteRootDirContents() {
2344   return Status::NotImplemented("Cannot delete all S3 buckets");
2345 }
2346 
DeleteFile(const std::string & s)2347 Status S3FileSystem::DeleteFile(const std::string& s) {
2348   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2349   RETURN_NOT_OK(ValidateFilePath(path));
2350 
2351   // Check the object exists
2352   S3Model::HeadObjectRequest req;
2353   req.SetBucket(ToAwsString(path.bucket));
2354   req.SetKey(ToAwsString(path.key));
2355 
2356   auto outcome = impl_->client_->HeadObject(req);
2357   if (!outcome.IsSuccess()) {
2358     if (IsNotFound(outcome.GetError())) {
2359       return PathNotFound(path);
2360     } else {
2361       return ErrorToStatus(
2362           std::forward_as_tuple("When getting information for key '", path.key,
2363                                 "' in bucket '", path.bucket, "': "),
2364           outcome.GetError());
2365     }
2366   }
2367   // Object found, delete it
2368   RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key));
2369   // Parent may be implicitly deleted if it became empty, recreate it
2370   return impl_->EnsureParentExists(path);
2371 }
2372 
Move(const std::string & src,const std::string & dest)2373 Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
2374   // XXX We don't implement moving directories as it would be too expensive:
2375   // one must copy all directory contents one by one (including object data),
2376   // then delete the original contents.
2377 
2378   ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
2379   RETURN_NOT_OK(ValidateFilePath(src_path));
2380   ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
2381   RETURN_NOT_OK(ValidateFilePath(dest_path));
2382 
2383   if (src_path == dest_path) {
2384     return Status::OK();
2385   }
2386   RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path));
2387   RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key));
2388   // Source parent may be implicitly deleted if it became empty, recreate it
2389   return impl_->EnsureParentExists(src_path);
2390 }
2391 
CopyFile(const std::string & src,const std::string & dest)2392 Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
2393   ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
2394   RETURN_NOT_OK(ValidateFilePath(src_path));
2395   ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
2396   RETURN_NOT_OK(ValidateFilePath(dest_path));
2397 
2398   if (src_path == dest_path) {
2399     return Status::OK();
2400   }
2401   return impl_->CopyObject(src_path, dest_path);
2402 }
2403 
OpenInputStream(const std::string & s)2404 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
2405     const std::string& s) {
2406   return impl_->OpenInputFile(s, this);
2407 }
2408 
OpenInputStream(const FileInfo & info)2409 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
2410     const FileInfo& info) {
2411   return impl_->OpenInputFile(info, this);
2412 }
2413 
OpenInputFile(const std::string & s)2414 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
2415     const std::string& s) {
2416   return impl_->OpenInputFile(s, this);
2417 }
2418 
OpenInputFile(const FileInfo & info)2419 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
2420     const FileInfo& info) {
2421   return impl_->OpenInputFile(info, this);
2422 }
2423 
OpenOutputStream(const std::string & s,const std::shared_ptr<const KeyValueMetadata> & metadata)2424 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
2425     const std::string& s, const std::shared_ptr<const KeyValueMetadata>& metadata) {
2426   ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2427   RETURN_NOT_OK(ValidateFilePath(path));
2428 
2429   auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path,
2430                                                   impl_->options(), metadata);
2431   RETURN_NOT_OK(ptr->Init());
2432   return ptr;
2433 }
2434 
OpenAppendStream(const std::string & path,const std::shared_ptr<const KeyValueMetadata> & metadata)2435 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
2436     const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
2437   // XXX Investigate UploadPartCopy? Does it work with source == destination?
2438   // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
2439   // (but would need to fall back to GET if the current data is < 5 MB)
2440   return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
2441 }
2442 
2443 //
2444 // Top-level utility functions
2445 //
2446 
ResolveBucketRegion(const std::string & bucket)2447 Result<std::string> ResolveBucketRegion(const std::string& bucket) {
2448   ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
2449   return resolver->ResolveRegion(bucket);
2450 }
2451 
2452 }  // namespace fs
2453 }  // namespace arrow
2454