1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/filesystem/s3fs.h"
19
20 #include <algorithm>
21 #include <atomic>
22 #include <condition_variable>
23 #include <functional>
24 #include <mutex>
25 #include <sstream>
26 #include <unordered_map>
27 #include <utility>
28
29 #ifdef _WIN32
30 // Undefine preprocessor macros that interfere with AWS function / method names
31 #ifdef GetMessage
32 #undef GetMessage
33 #endif
34 #ifdef GetObject
35 #undef GetObject
36 #endif
37 #endif
38
39 #include <aws/core/Aws.h>
40 #include <aws/core/Region.h>
41 #include <aws/core/auth/AWSCredentials.h>
42 #include <aws/core/auth/AWSCredentialsProviderChain.h>
43 #include <aws/core/client/RetryStrategy.h>
44 #include <aws/core/http/HttpResponse.h>
45 #include <aws/core/utils/logging/ConsoleLogSystem.h>
46 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
47 #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
48 #include <aws/s3/S3Client.h>
49 #include <aws/s3/model/AbortMultipartUploadRequest.h>
50 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
51 #include <aws/s3/model/CompletedMultipartUpload.h>
52 #include <aws/s3/model/CompletedPart.h>
53 #include <aws/s3/model/CopyObjectRequest.h>
54 #include <aws/s3/model/CreateBucketRequest.h>
55 #include <aws/s3/model/CreateMultipartUploadRequest.h>
56 #include <aws/s3/model/DeleteBucketRequest.h>
57 #include <aws/s3/model/DeleteObjectRequest.h>
58 #include <aws/s3/model/DeleteObjectsRequest.h>
59 #include <aws/s3/model/GetObjectRequest.h>
60 #include <aws/s3/model/HeadBucketRequest.h>
61 #include <aws/s3/model/HeadObjectRequest.h>
62 #include <aws/s3/model/ListBucketsResult.h>
63 #include <aws/s3/model/ListObjectsV2Request.h>
64 #include <aws/s3/model/PutObjectRequest.h>
65 #include <aws/s3/model/UploadPartRequest.h>
66
67 #include "arrow/buffer.h"
68 #include "arrow/filesystem/filesystem.h"
69 #include "arrow/filesystem/path_util.h"
70 #include "arrow/filesystem/s3_internal.h"
71 #include "arrow/filesystem/util_internal.h"
72 #include "arrow/io/interfaces.h"
73 #include "arrow/io/memory.h"
74 #include "arrow/io/util_internal.h"
75 #include "arrow/result.h"
76 #include "arrow/status.h"
77 #include "arrow/util/atomic_shared_ptr.h"
78 #include "arrow/util/checked_cast.h"
79 #include "arrow/util/future.h"
80 #include "arrow/util/logging.h"
81 #include "arrow/util/optional.h"
82 #include "arrow/util/windows_fixup.h"
83
84 namespace arrow {
85
86 using internal::Uri;
87
88 namespace fs {
89
90 using ::Aws::Client::AWSError;
91 using ::Aws::S3::S3Errors;
92 namespace S3Model = Aws::S3::Model;
93
94 using internal::ConnectRetryStrategy;
95 using internal::DetectS3Backend;
96 using internal::ErrorToStatus;
97 using internal::FromAwsDatetime;
98 using internal::FromAwsString;
99 using internal::IsAlreadyExists;
100 using internal::IsNotFound;
101 using internal::OutcomeToResult;
102 using internal::OutcomeToStatus;
103 using internal::S3Backend;
104 using internal::ToAwsString;
105 using internal::ToURLEncodedAwsString;
106
107 static const char kSep = '/';
108
109 namespace {
110
111 std::mutex aws_init_lock;
112 Aws::SDKOptions aws_options;
113 std::atomic<bool> aws_initialized(false);
114
DoInitializeS3(const S3GlobalOptions & options)115 Status DoInitializeS3(const S3GlobalOptions& options) {
116 Aws::Utils::Logging::LogLevel aws_log_level;
117
118 #define LOG_LEVEL_CASE(level_name) \
119 case S3LogLevel::level_name: \
120 aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
121 break;
122
123 switch (options.log_level) {
124 LOG_LEVEL_CASE(Fatal)
125 LOG_LEVEL_CASE(Error)
126 LOG_LEVEL_CASE(Warn)
127 LOG_LEVEL_CASE(Info)
128 LOG_LEVEL_CASE(Debug)
129 LOG_LEVEL_CASE(Trace)
130 default:
131 aws_log_level = Aws::Utils::Logging::LogLevel::Off;
132 }
133
134 #undef LOG_LEVEL_CASE
135
136 aws_options.loggingOptions.logLevel = aws_log_level;
137 // By default the AWS SDK logs to files, log to console instead
138 aws_options.loggingOptions.logger_create_fn = [] {
139 return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
140 aws_options.loggingOptions.logLevel);
141 };
142 Aws::InitAPI(aws_options);
143 aws_initialized.store(true);
144 return Status::OK();
145 }
146
147 } // namespace
148
InitializeS3(const S3GlobalOptions & options)149 Status InitializeS3(const S3GlobalOptions& options) {
150 std::lock_guard<std::mutex> lock(aws_init_lock);
151 return DoInitializeS3(options);
152 }
153
FinalizeS3()154 Status FinalizeS3() {
155 std::lock_guard<std::mutex> lock(aws_init_lock);
156 Aws::ShutdownAPI(aws_options);
157 aws_initialized.store(false);
158 return Status::OK();
159 }
160
EnsureS3Initialized()161 Status EnsureS3Initialized() {
162 std::lock_guard<std::mutex> lock(aws_init_lock);
163 if (!aws_initialized.load()) {
164 S3GlobalOptions options{S3LogLevel::Fatal};
165 return DoInitializeS3(options);
166 }
167 return Status::OK();
168 }
169
170 // -----------------------------------------------------------------------
171 // S3Options implementation
172
ConfigureDefaultCredentials()173 void S3Options::ConfigureDefaultCredentials() {
174 credentials_provider =
175 std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
176 }
177
ConfigureAnonymousCredentials()178 void S3Options::ConfigureAnonymousCredentials() {
179 credentials_provider = std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
180 }
181
ConfigureAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)182 void S3Options::ConfigureAccessKey(const std::string& access_key,
183 const std::string& secret_key,
184 const std::string& session_token) {
185 credentials_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
186 ToAwsString(access_key), ToAwsString(secret_key), ToAwsString(session_token));
187 }
188
ConfigureAssumeRoleCredentials(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)189 void S3Options::ConfigureAssumeRoleCredentials(
190 const std::string& role_arn, const std::string& session_name,
191 const std::string& external_id, int load_frequency,
192 const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
193 credentials_provider = std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
194 ToAwsString(role_arn), ToAwsString(session_name), ToAwsString(external_id),
195 load_frequency, stsClient);
196 }
197
GetAccessKey() const198 std::string S3Options::GetAccessKey() const {
199 auto credentials = credentials_provider->GetAWSCredentials();
200 return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
201 }
202
GetSecretKey() const203 std::string S3Options::GetSecretKey() const {
204 auto credentials = credentials_provider->GetAWSCredentials();
205 return std::string(FromAwsString(credentials.GetAWSSecretKey()));
206 }
207
GetSessionToken() const208 std::string S3Options::GetSessionToken() const {
209 auto credentials = credentials_provider->GetAWSCredentials();
210 return std::string(FromAwsString(credentials.GetSessionToken()));
211 }
212
Defaults()213 S3Options S3Options::Defaults() {
214 S3Options options;
215 options.ConfigureDefaultCredentials();
216 return options;
217 }
218
Anonymous()219 S3Options S3Options::Anonymous() {
220 S3Options options;
221 options.ConfigureAnonymousCredentials();
222 return options;
223 }
224
FromAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)225 S3Options S3Options::FromAccessKey(const std::string& access_key,
226 const std::string& secret_key,
227 const std::string& session_token) {
228 S3Options options;
229 options.ConfigureAccessKey(access_key, secret_key, session_token);
230 return options;
231 }
232
FromAssumeRole(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)233 S3Options S3Options::FromAssumeRole(
234 const std::string& role_arn, const std::string& session_name,
235 const std::string& external_id, int load_frequency,
236 const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
237 S3Options options;
238 options.role_arn = role_arn;
239 options.session_name = session_name;
240 options.external_id = external_id;
241 options.load_frequency = load_frequency;
242 options.ConfigureAssumeRoleCredentials(role_arn, session_name, external_id,
243 load_frequency, stsClient);
244 return options;
245 }
246
FromUri(const Uri & uri,std::string * out_path)247 Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
248 S3Options options;
249
250 const auto bucket = uri.host();
251 auto path = uri.path();
252 if (bucket.empty()) {
253 if (!path.empty()) {
254 return Status::Invalid("Missing bucket name in S3 URI");
255 }
256 } else {
257 if (path.empty()) {
258 path = bucket;
259 } else {
260 if (path[0] != '/') {
261 return Status::Invalid("S3 URI should absolute, not relative");
262 }
263 path = bucket + path;
264 }
265 }
266 if (out_path != nullptr) {
267 *out_path = std::string(internal::RemoveTrailingSlash(path));
268 }
269
270 std::unordered_map<std::string, std::string> options_map;
271 ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
272 for (const auto& kv : options_items) {
273 options_map.emplace(kv.first, kv.second);
274 }
275
276 const auto username = uri.username();
277 if (!username.empty()) {
278 options.ConfigureAccessKey(username, uri.password());
279 } else {
280 options.ConfigureDefaultCredentials();
281 }
282
283 bool region_set = false;
284 for (const auto& kv : options_map) {
285 if (kv.first == "region") {
286 options.region = kv.second;
287 region_set = true;
288 } else if (kv.first == "scheme") {
289 options.scheme = kv.second;
290 } else if (kv.first == "endpoint_override") {
291 options.endpoint_override = kv.second;
292 } else {
293 return Status::Invalid("Unexpected query parameter in S3 URI: '", kv.first, "'");
294 }
295 }
296
297 if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
298 // XXX Should we use a dedicated resolver with the given credentials?
299 ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
300 }
301
302 return options;
303 }
304
FromUri(const std::string & uri_string,std::string * out_path)305 Result<S3Options> S3Options::FromUri(const std::string& uri_string,
306 std::string* out_path) {
307 Uri uri;
308 RETURN_NOT_OK(uri.Parse(uri_string));
309 return FromUri(uri, out_path);
310 }
311
Equals(const S3Options & other) const312 bool S3Options::Equals(const S3Options& other) const {
313 return (region == other.region && endpoint_override == other.endpoint_override &&
314 scheme == other.scheme && background_writes == other.background_writes &&
315 GetAccessKey() == other.GetAccessKey() &&
316 GetSecretKey() == other.GetSecretKey() &&
317 GetSessionToken() == other.GetSessionToken());
318 }
319
320 namespace {
321
CheckS3Initialized()322 Status CheckS3Initialized() {
323 if (!aws_initialized.load()) {
324 return Status::Invalid(
325 "S3 subsystem not initialized; please call InitializeS3() "
326 "before carrying out any S3-related operation");
327 }
328 return Status::OK();
329 }
330
331 // XXX Sanitize paths by removing leading slash?
332
333 struct S3Path {
334 std::string full_path;
335 std::string bucket;
336 std::string key;
337 std::vector<std::string> key_parts;
338
FromStringarrow::fs::__anoneb4239ef0311::S3Path339 static Result<S3Path> FromString(const std::string& s) {
340 const auto src = internal::RemoveTrailingSlash(s);
341 auto first_sep = src.find_first_of(kSep);
342 if (first_sep == 0) {
343 return Status::Invalid("Path cannot start with a separator ('", s, "')");
344 }
345 if (first_sep == std::string::npos) {
346 return S3Path{std::string(src), std::string(src), "", {}};
347 }
348 S3Path path;
349 path.full_path = std::string(src);
350 path.bucket = std::string(src.substr(0, first_sep));
351 path.key = std::string(src.substr(first_sep + 1));
352 path.key_parts = internal::SplitAbstractPath(path.key);
353 RETURN_NOT_OK(Validate(&path));
354 return path;
355 }
356
Validatearrow::fs::__anoneb4239ef0311::S3Path357 static Status Validate(const S3Path* path) {
358 auto result = internal::ValidateAbstractPathParts(path->key_parts);
359 if (!result.ok()) {
360 return Status::Invalid(result.message(), " in path ", path->full_path);
361 } else {
362 return result;
363 }
364 }
365
ToURLEncodedAwsStringarrow::fs::__anoneb4239ef0311::S3Path366 Aws::String ToURLEncodedAwsString() const {
367 // URL-encode individual parts, not the '/' separator
368 Aws::String res;
369 res += internal::ToURLEncodedAwsString(bucket);
370 for (const auto& part : key_parts) {
371 res += kSep;
372 res += internal::ToURLEncodedAwsString(part);
373 }
374 return res;
375 }
376
parentarrow::fs::__anoneb4239ef0311::S3Path377 S3Path parent() const {
378 DCHECK(!key_parts.empty());
379 auto parent = S3Path{"", bucket, "", key_parts};
380 parent.key_parts.pop_back();
381 parent.key = internal::JoinAbstractPath(parent.key_parts);
382 parent.full_path = parent.bucket + kSep + parent.key;
383 return parent;
384 }
385
has_parentarrow::fs::__anoneb4239ef0311::S3Path386 bool has_parent() const { return !key.empty(); }
387
emptyarrow::fs::__anoneb4239ef0311::S3Path388 bool empty() const { return bucket.empty() && key.empty(); }
389
operator ==arrow::fs::__anoneb4239ef0311::S3Path390 bool operator==(const S3Path& other) const {
391 return bucket == other.bucket && key == other.key;
392 }
393 };
394
395 // XXX return in OutcomeToStatus instead?
PathNotFound(const S3Path & path)396 Status PathNotFound(const S3Path& path) {
397 return ::arrow::fs::internal::PathNotFound(path.full_path);
398 }
399
PathNotFound(const std::string & bucket,const std::string & key)400 Status PathNotFound(const std::string& bucket, const std::string& key) {
401 return ::arrow::fs::internal::PathNotFound(bucket + kSep + key);
402 }
403
NotAFile(const S3Path & path)404 Status NotAFile(const S3Path& path) {
405 return ::arrow::fs::internal::NotAFile(path.full_path);
406 }
407
ValidateFilePath(const S3Path & path)408 Status ValidateFilePath(const S3Path& path) {
409 if (path.bucket.empty() || path.key.empty()) {
410 return NotAFile(path);
411 }
412 return Status::OK();
413 }
414
FormatRange(int64_t start,int64_t length)415 std::string FormatRange(int64_t start, int64_t length) {
416 // Format a HTTP range header value
417 std::stringstream ss;
418 ss << "bytes=" << start << "-" << start + length - 1;
419 return ss.str();
420 }
421
422 class S3Client : public Aws::S3::S3Client {
423 public:
424 using Aws::S3::S3Client::S3Client;
425
426 // To get a bucket's region, we must extract the "x-amz-bucket-region" header
427 // from the response to a HEAD bucket request.
428 // Unfortunately, the S3Client APIs don't let us access the headers of successful
429 // responses. So we have to cook a AWS request and issue it ourselves.
430
GetBucketRegion(const S3Model::HeadBucketRequest & request)431 Result<std::string> GetBucketRegion(const S3Model::HeadBucketRequest& request) {
432 auto uri = GeneratePresignedUrl(request.GetBucket(),
433 /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD);
434 // NOTE: The signer region argument isn't passed here, as there's no easy
435 // way of computing it (the relevant method is private).
436 auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD,
437 Aws::Auth::SIGV4_SIGNER);
438 const auto code = outcome.IsSuccess() ? outcome.GetResult().GetResponseCode()
439 : outcome.GetError().GetResponseCode();
440 const auto& headers = outcome.IsSuccess()
441 ? outcome.GetResult().GetHeaderValueCollection()
442 : outcome.GetError().GetResponseHeaders();
443
444 const auto it = headers.find(ToAwsString("x-amz-bucket-region"));
445 if (it == headers.end()) {
446 if (code == Aws::Http::HttpResponseCode::NOT_FOUND) {
447 return Status::IOError("Bucket '", request.GetBucket(), "' not found");
448 } else if (!outcome.IsSuccess()) {
449 return ErrorToStatus(std::forward_as_tuple("When resolving region for bucket '",
450 request.GetBucket(), "': "),
451 outcome.GetError());
452 } else {
453 return Status::IOError("When resolving region for bucket '", request.GetBucket(),
454 "': missing 'x-amz-bucket-region' header in response");
455 }
456 }
457 return std::string(FromAwsString(it->second));
458 }
459
GetBucketRegion(const std::string & bucket)460 Result<std::string> GetBucketRegion(const std::string& bucket) {
461 S3Model::HeadBucketRequest req;
462 req.SetBucket(ToAwsString(bucket));
463 return GetBucketRegion(req);
464 }
465 };
466
467 // In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a bool.
468 template <bool Never = false>
DisableRedirectsImpl(bool * followRedirects)469 void DisableRedirectsImpl(bool* followRedirects) {
470 *followRedirects = false;
471 }
472
473 // In AWS SDK >= 1.8, it's a Aws::Client::FollowRedirectsPolicy scoped enum.
474 template <typename PolicyEnum, PolicyEnum Never = PolicyEnum::NEVER>
DisableRedirectsImpl(PolicyEnum * followRedirects)475 void DisableRedirectsImpl(PolicyEnum* followRedirects) {
476 *followRedirects = Never;
477 }
478
DisableRedirects(Aws::Client::ClientConfiguration * c)479 void DisableRedirects(Aws::Client::ClientConfiguration* c) {
480 DisableRedirectsImpl(&c->followRedirects);
481 }
482
483 class ClientBuilder {
484 public:
ClientBuilder(S3Options options)485 explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
486
config() const487 const Aws::Client::ClientConfiguration& config() const { return client_config_; }
488
mutable_config()489 Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; }
490
BuildClient()491 Result<std::unique_ptr<S3Client>> BuildClient() {
492 credentials_provider_ = options_.credentials_provider;
493 if (!options_.region.empty()) {
494 client_config_.region = ToAwsString(options_.region);
495 }
496 client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
497 if (options_.scheme == "http") {
498 client_config_.scheme = Aws::Http::Scheme::HTTP;
499 } else if (options_.scheme == "https") {
500 client_config_.scheme = Aws::Http::Scheme::HTTPS;
501 } else {
502 return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
503 }
504 client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
505 if (!internal::global_options.tls_ca_file_path.empty()) {
506 client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
507 }
508 if (!internal::global_options.tls_ca_dir_path.empty()) {
509 client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
510 }
511
512 const bool use_virtual_addressing = options_.endpoint_override.empty();
513 return std::unique_ptr<S3Client>(
514 new S3Client(credentials_provider_, client_config_,
515 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
516 use_virtual_addressing));
517 }
518
options() const519 const S3Options& options() const { return options_; }
520
521 protected:
522 S3Options options_;
523 Aws::Client::ClientConfiguration client_config_;
524 std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
525 };
526
527 // -----------------------------------------------------------------------
528 // S3 region resolver
529
530 class RegionResolver {
531 public:
Make(S3Options options)532 static Result<std::shared_ptr<RegionResolver>> Make(S3Options options) {
533 std::shared_ptr<RegionResolver> resolver(new RegionResolver(std::move(options)));
534 RETURN_NOT_OK(resolver->Init());
535 return resolver;
536 }
537
DefaultInstance()538 static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
539 static std::shared_ptr<RegionResolver> instance;
540 auto resolver = arrow::internal::atomic_load(&instance);
541 if (resolver) {
542 return resolver;
543 }
544 auto maybe_resolver = Make(S3Options::Anonymous());
545 if (!maybe_resolver.ok()) {
546 return maybe_resolver;
547 }
548 // Make sure to always return the same instance even if several threads
549 // call DefaultInstance at once.
550 std::shared_ptr<RegionResolver> existing;
551 if (arrow::internal::atomic_compare_exchange_strong(&instance, &existing,
552 *maybe_resolver)) {
553 return *maybe_resolver;
554 } else {
555 return existing;
556 }
557 }
558
ResolveRegion(const std::string & bucket)559 Result<std::string> ResolveRegion(const std::string& bucket) {
560 std::unique_lock<std::mutex> lock(cache_mutex_);
561 auto it = cache_.find(bucket);
562 if (it != cache_.end()) {
563 return it->second;
564 }
565 lock.unlock();
566 ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
567 lock.lock();
568 // Note we don't cache a non-existent bucket, as the bucket could be created later
569 cache_[bucket] = region;
570 return region;
571 }
572
ResolveRegionUncached(const std::string & bucket)573 Result<std::string> ResolveRegionUncached(const std::string& bucket) {
574 return client_->GetBucketRegion(bucket);
575 }
576
577 protected:
RegionResolver(S3Options options)578 explicit RegionResolver(S3Options options) : builder_(std::move(options)) {}
579
Init()580 Status Init() {
581 DCHECK(builder_.options().endpoint_override.empty());
582 // On Windows with AWS SDK >= 1.8, it is necessary to disable redirects (ARROW-10085).
583 DisableRedirects(builder_.mutable_config());
584 return builder_.BuildClient().Value(&client_);
585 }
586
587 ClientBuilder builder_;
588 std::unique_ptr<S3Client> client_;
589
590 std::mutex cache_mutex_;
591 // XXX Should cache size be bounded? It must be quite unusual to query millions
592 // of different buckets in a single program invocation...
593 std::unordered_map<std::string, std::string> cache_;
594 };
595
596 // -----------------------------------------------------------------------
597 // S3 file stream implementations
598
599 // A non-copying iostream.
600 // See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
601 // https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
602 class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
603 public:
StringViewStream(const void * data,int64_t nbytes)604 StringViewStream(const void* data, int64_t nbytes)
605 : Aws::Utils::Stream::PreallocatedStreamBuf(
606 reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
607 static_cast<size_t>(nbytes)),
608 std::iostream(this) {}
609 };
610
611 // By default, the AWS SDK reads object data into an auto-growing StringStream.
612 // To avoid copies, read directly into our preallocated buffer instead.
613 // See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
614 // functionally similar recipe.
AwsWriteableStreamFactory(void * data,int64_t nbytes)615 Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
616 return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
617 }
618
GetObjectRange(Aws::S3::S3Client * client,const S3Path & path,int64_t start,int64_t length,void * out)619 Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
620 const S3Path& path, int64_t start,
621 int64_t length, void* out) {
622 S3Model::GetObjectRequest req;
623 req.SetBucket(ToAwsString(path.bucket));
624 req.SetKey(ToAwsString(path.key));
625 req.SetRange(ToAwsString(FormatRange(start, length)));
626 req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
627 return OutcomeToResult(client->GetObject(req));
628 }
629
630 // A RandomAccessFile that reads from a S3 object
631 class ObjectInputFile final : public io::RandomAccessFile {
632 public:
ObjectInputFile(std::shared_ptr<FileSystem> fs,Aws::S3::S3Client * client,const S3Path & path,int64_t size=kNoSize)633 ObjectInputFile(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
634 const S3Path& path, int64_t size = kNoSize)
635 : fs_(std::move(fs)), client_(client), path_(path), content_length_(size) {}
636
Init()637 Status Init() {
638 // Issue a HEAD Object to get the content-length and ensure any
639 // errors (e.g. file not found) don't wait until the first Read() call.
640 if (content_length_ != kNoSize) {
641 DCHECK_GE(content_length_, 0);
642 return Status::OK();
643 }
644
645 S3Model::HeadObjectRequest req;
646 req.SetBucket(ToAwsString(path_.bucket));
647 req.SetKey(ToAwsString(path_.key));
648
649 auto outcome = client_->HeadObject(req);
650 if (!outcome.IsSuccess()) {
651 if (IsNotFound(outcome.GetError())) {
652 return PathNotFound(path_);
653 } else {
654 return ErrorToStatus(
655 std::forward_as_tuple("When reading information for key '", path_.key,
656 "' in bucket '", path_.bucket, "': "),
657 outcome.GetError());
658 }
659 }
660 content_length_ = outcome.GetResult().GetContentLength();
661 DCHECK_GE(content_length_, 0);
662 return Status::OK();
663 }
664
CheckClosed() const665 Status CheckClosed() const {
666 if (closed_) {
667 return Status::Invalid("Operation on closed stream");
668 }
669 return Status::OK();
670 }
671
CheckPosition(int64_t position,const char * action) const672 Status CheckPosition(int64_t position, const char* action) const {
673 if (position < 0) {
674 return Status::Invalid("Cannot ", action, " from negative position");
675 }
676 if (position > content_length_) {
677 return Status::IOError("Cannot ", action, " past end of file");
678 }
679 return Status::OK();
680 }
681
682 // RandomAccessFile APIs
683
Close()684 Status Close() override {
685 fs_.reset();
686 client_ = nullptr;
687 closed_ = true;
688 return Status::OK();
689 }
690
closed() const691 bool closed() const override { return closed_; }
692
Tell() const693 Result<int64_t> Tell() const override {
694 RETURN_NOT_OK(CheckClosed());
695 return pos_;
696 }
697
GetSize()698 Result<int64_t> GetSize() override {
699 RETURN_NOT_OK(CheckClosed());
700 return content_length_;
701 }
702
Seek(int64_t position)703 Status Seek(int64_t position) override {
704 RETURN_NOT_OK(CheckClosed());
705 RETURN_NOT_OK(CheckPosition(position, "seek"));
706
707 pos_ = position;
708 return Status::OK();
709 }
710
ReadAt(int64_t position,int64_t nbytes,void * out)711 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
712 RETURN_NOT_OK(CheckClosed());
713 RETURN_NOT_OK(CheckPosition(position, "read"));
714
715 nbytes = std::min(nbytes, content_length_ - position);
716 if (nbytes == 0) {
717 return 0;
718 }
719
720 // Read the desired range of bytes
721 ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
722 GetObjectRange(client_, path_, position, nbytes, out));
723
724 auto& stream = result.GetBody();
725 stream.ignore(nbytes);
726 // NOTE: the stream is a stringstream by default, there is no actual error
727 // to check for. However, stream.fail() may return true if EOF is reached.
728 return stream.gcount();
729 }
730
ReadAt(int64_t position,int64_t nbytes)731 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
732 RETURN_NOT_OK(CheckClosed());
733 RETURN_NOT_OK(CheckPosition(position, "read"));
734
735 // No need to allocate more than the remaining number of bytes
736 nbytes = std::min(nbytes, content_length_ - position);
737
738 ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes));
739 if (nbytes > 0) {
740 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
741 ReadAt(position, nbytes, buf->mutable_data()));
742 DCHECK_LE(bytes_read, nbytes);
743 RETURN_NOT_OK(buf->Resize(bytes_read));
744 }
745 return std::move(buf);
746 }
747
Read(int64_t nbytes,void * out)748 Result<int64_t> Read(int64_t nbytes, void* out) override {
749 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
750 pos_ += bytes_read;
751 return bytes_read;
752 }
753
Read(int64_t nbytes)754 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
755 ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
756 pos_ += buffer->size();
757 return std::move(buffer);
758 }
759
760 protected:
761 std::shared_ptr<FileSystem> fs_; // Owner of S3Client
762 Aws::S3::S3Client* client_;
763 S3Path path_;
764 bool closed_ = false;
765 int64_t pos_ = 0;
766 int64_t content_length_ = kNoSize;
767 };
768
769 // Minimum size for each part of a multipart upload, except for the last part.
770 // AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
771 // so I chose the safer value.
772 // (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
773 static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
774
775 // An OutputStream that writes to a S3 object
776 class ObjectOutputStream final : public io::OutputStream {
777 protected:
778 struct UploadState;
779
780 public:
ObjectOutputStream(std::shared_ptr<FileSystem> fs,Aws::S3::S3Client * client,const S3Path & path,const S3Options & options)781 ObjectOutputStream(std::shared_ptr<FileSystem> fs, Aws::S3::S3Client* client,
782 const S3Path& path, const S3Options& options)
783 : fs_(std::move(fs)), client_(client), path_(path), options_(options) {}
784
~ObjectOutputStream()785 ~ObjectOutputStream() override {
786 // For compliance with the rest of the IO stack, Close rather than Abort,
787 // even though it may be more expensive.
788 io::internal::CloseFromDestructor(this);
789 }
790
Init()791 Status Init() {
792 // Initiate the multi-part upload
793 S3Model::CreateMultipartUploadRequest req;
794 req.SetBucket(ToAwsString(path_.bucket));
795 req.SetKey(ToAwsString(path_.key));
796
797 auto outcome = client_->CreateMultipartUpload(req);
798 if (!outcome.IsSuccess()) {
799 return ErrorToStatus(
800 std::forward_as_tuple("When initiating multiple part upload for key '",
801 path_.key, "' in bucket '", path_.bucket, "': "),
802 outcome.GetError());
803 }
804 upload_id_ = outcome.GetResult().GetUploadId();
805 upload_state_ = std::make_shared<UploadState>();
806 closed_ = false;
807 return Status::OK();
808 }
809
Abort()810 Status Abort() override {
811 if (closed_) {
812 return Status::OK();
813 }
814
815 S3Model::AbortMultipartUploadRequest req;
816 req.SetBucket(ToAwsString(path_.bucket));
817 req.SetKey(ToAwsString(path_.key));
818 req.SetUploadId(upload_id_);
819
820 auto outcome = client_->AbortMultipartUpload(req);
821 if (!outcome.IsSuccess()) {
822 return ErrorToStatus(
823 std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
824 "' in bucket '", path_.bucket, "': "),
825 outcome.GetError());
826 }
827 current_part_.reset();
828 fs_.reset();
829 client_ = nullptr;
830 closed_ = true;
831 return Status::OK();
832 }
833
834 // OutputStream interface
835
Close()836 Status Close() override {
837 if (closed_) {
838 return Status::OK();
839 }
840
841 if (current_part_) {
842 // Upload last part
843 RETURN_NOT_OK(CommitCurrentPart());
844 }
845
846 // S3 mandates at least one part, upload an empty one if necessary
847 if (part_number_ == 1) {
848 RETURN_NOT_OK(UploadPart("", 0));
849 }
850
851 // Wait for in-progress uploads to finish (if async writes are enabled)
852 RETURN_NOT_OK(Flush());
853
854 // At this point, all part uploads have finished successfully
855 DCHECK_GT(part_number_, 1);
856 DCHECK_EQ(upload_state_->completed_parts.size(),
857 static_cast<size_t>(part_number_ - 1));
858
859 S3Model::CompletedMultipartUpload completed_upload;
860 completed_upload.SetParts(upload_state_->completed_parts);
861 S3Model::CompleteMultipartUploadRequest req;
862 req.SetBucket(ToAwsString(path_.bucket));
863 req.SetKey(ToAwsString(path_.key));
864 req.SetUploadId(upload_id_);
865 req.SetMultipartUpload(std::move(completed_upload));
866
867 auto outcome = client_->CompleteMultipartUpload(req);
868 if (!outcome.IsSuccess()) {
869 return ErrorToStatus(
870 std::forward_as_tuple("When completing multiple part upload for key '",
871 path_.key, "' in bucket '", path_.bucket, "': "),
872 outcome.GetError());
873 }
874
875 fs_.reset();
876 client_ = nullptr;
877 closed_ = true;
878 return Status::OK();
879 }
880
closed() const881 bool closed() const override { return closed_; }
882
Tell() const883 Result<int64_t> Tell() const override {
884 if (closed_) {
885 return Status::Invalid("Operation on closed stream");
886 }
887 return pos_;
888 }
889
Write(const std::shared_ptr<Buffer> & buffer)890 Status Write(const std::shared_ptr<Buffer>& buffer) override {
891 return DoWrite(buffer->data(), buffer->size(), buffer);
892 }
893
Write(const void * data,int64_t nbytes)894 Status Write(const void* data, int64_t nbytes) override {
895 return DoWrite(data, nbytes);
896 }
897
DoWrite(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)898 Status DoWrite(const void* data, int64_t nbytes,
899 std::shared_ptr<Buffer> owned_buffer = nullptr) {
900 if (closed_) {
901 return Status::Invalid("Operation on closed stream");
902 }
903
904 if (!current_part_ && nbytes >= part_upload_threshold_) {
905 // No current part and data large enough, upload it directly
906 // (without copying if the buffer is owned)
907 RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
908 pos_ += nbytes;
909 return Status::OK();
910 }
911 // Can't upload data on its own, need to buffer it
912 if (!current_part_) {
913 ARROW_ASSIGN_OR_RAISE(current_part_,
914 io::BufferOutputStream::Create(part_upload_threshold_));
915 current_part_size_ = 0;
916 }
917 RETURN_NOT_OK(current_part_->Write(data, nbytes));
918 pos_ += nbytes;
919 current_part_size_ += nbytes;
920
921 if (current_part_size_ >= part_upload_threshold_) {
922 // Current part large enough, upload it
923 RETURN_NOT_OK(CommitCurrentPart());
924 }
925
926 return Status::OK();
927 }
928
Flush()929 Status Flush() override {
930 if (closed_) {
931 return Status::Invalid("Operation on closed stream");
932 }
933 // Wait for background writes to finish
934 std::unique_lock<std::mutex> lock(upload_state_->mutex);
935 upload_state_->cv.wait(lock,
936 [this]() { return upload_state_->parts_in_progress == 0; });
937 return upload_state_->status;
938 }
939
940 // Upload-related helpers
941
CommitCurrentPart()942 Status CommitCurrentPart() {
943 ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
944 current_part_.reset();
945 current_part_size_ = 0;
946 return UploadPart(buf);
947 }
948
UploadPart(std::shared_ptr<Buffer> buffer)949 Status UploadPart(std::shared_ptr<Buffer> buffer) {
950 return UploadPart(buffer->data(), buffer->size(), buffer);
951 }
952
UploadPart(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)953 Status UploadPart(const void* data, int64_t nbytes,
954 std::shared_ptr<Buffer> owned_buffer = nullptr) {
955 S3Model::UploadPartRequest req;
956 req.SetBucket(ToAwsString(path_.bucket));
957 req.SetKey(ToAwsString(path_.key));
958 req.SetUploadId(upload_id_);
959 req.SetPartNumber(part_number_);
960 req.SetContentLength(nbytes);
961
962 if (!options_.background_writes) {
963 req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
964 auto outcome = client_->UploadPart(req);
965 if (!outcome.IsSuccess()) {
966 return UploadPartError(req, outcome);
967 } else {
968 AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
969 }
970 } else {
971 std::unique_lock<std::mutex> lock(upload_state_->mutex);
972 auto state = upload_state_; // Keep upload state alive in closure
973 auto part_number = part_number_;
974
975 // If the data isn't owned, make an immutable copy for the lifetime of the closure
976 if (owned_buffer == nullptr) {
977 ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes));
978 memcpy(owned_buffer->mutable_data(), data, nbytes);
979 } else {
980 DCHECK_EQ(data, owned_buffer->data());
981 DCHECK_EQ(nbytes, owned_buffer->size());
982 }
983 req.SetBody(
984 std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
985
986 auto handler =
987 [state, owned_buffer, part_number](
988 const Aws::S3::S3Client*, const S3Model::UploadPartRequest& req,
989 const S3Model::UploadPartOutcome& outcome,
990 const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) -> void {
991 std::unique_lock<std::mutex> lock(state->mutex);
992 if (!outcome.IsSuccess()) {
993 state->status &= UploadPartError(req, outcome);
994 } else {
995 AddCompletedPart(state, part_number, outcome.GetResult());
996 }
997 // Notify completion, regardless of success / error status
998 if (--state->parts_in_progress == 0) {
999 state->cv.notify_all();
1000 }
1001 };
1002 ++upload_state_->parts_in_progress;
1003 client_->UploadPartAsync(req, handler);
1004 }
1005
1006 ++part_number_;
1007 // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
1008 // of exactly 5MB would be limited to 50GB total. To avoid that, we bump
1009 // the upload threshold every 100 parts. So the pattern is:
1010 // - part 1 to 99: 5MB threshold
1011 // - part 100 to 199: 10MB threshold
1012 // - part 200 to 299: 15MB threshold
1013 // ...
1014 // - part 9900 to 9999: 500MB threshold
1015 // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
1016 // chunk sizes and avoiding too much buffering in the common case of a small-ish
1017 // stream. If the limit's not enough, we can revisit.
1018 if (part_number_ % 100 == 0) {
1019 part_upload_threshold_ += kMinimumPartUpload;
1020 }
1021
1022 return Status::OK();
1023 }
1024
AddCompletedPart(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartResult & result)1025 static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number,
1026 const S3Model::UploadPartResult& result) {
1027 S3Model::CompletedPart part;
1028 // Append ETag and part number for this uploaded part
1029 // (will be needed for upload completion in Close())
1030 part.SetPartNumber(part_number);
1031 part.SetETag(result.GetETag());
1032 int slot = part_number - 1;
1033 if (state->completed_parts.size() <= static_cast<size_t>(slot)) {
1034 state->completed_parts.resize(slot + 1);
1035 }
1036 DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet());
1037 state->completed_parts[slot] = std::move(part);
1038 }
1039
UploadPartError(const S3Model::UploadPartRequest & req,const S3Model::UploadPartOutcome & outcome)1040 static Status UploadPartError(const S3Model::UploadPartRequest& req,
1041 const S3Model::UploadPartOutcome& outcome) {
1042 return ErrorToStatus(
1043 std::forward_as_tuple("When uploading part for key '", req.GetKey(),
1044 "' in bucket '", req.GetBucket(), "': "),
1045 outcome.GetError());
1046 }
1047
1048 protected:
1049 std::shared_ptr<FileSystem> fs_; // Owner of S3Client
1050 Aws::S3::S3Client* client_;
1051 S3Path path_;
1052 const S3Options& options_;
1053 Aws::String upload_id_;
1054 bool closed_ = true;
1055 int64_t pos_ = 0;
1056 int32_t part_number_ = 1;
1057 std::shared_ptr<io::BufferOutputStream> current_part_;
1058 int64_t current_part_size_ = 0;
1059 int64_t part_upload_threshold_ = kMinimumPartUpload;
1060
1061 // This struct is kept alive through background writes to avoid problems
1062 // in the completion handler.
1063 struct UploadState {
1064 std::mutex mutex;
1065 std::condition_variable cv;
1066 Aws::Vector<S3Model::CompletedPart> completed_parts;
1067 int64_t parts_in_progress = 0;
1068 Status status;
1069
UploadStatearrow::fs::__anoneb4239ef0311::ObjectOutputStream::UploadState1070 UploadState() : status(Status::OK()) {}
1071 };
1072 std::shared_ptr<UploadState> upload_state_;
1073 };
1074
1075 // This function assumes info->path() is already set
FileObjectToInfo(const S3Model::HeadObjectResult & obj,FileInfo * info)1076 void FileObjectToInfo(const S3Model::HeadObjectResult& obj, FileInfo* info) {
1077 info->set_type(FileType::File);
1078 info->set_size(static_cast<int64_t>(obj.GetContentLength()));
1079 info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1080 }
1081
FileObjectToInfo(const S3Model::Object & obj,FileInfo * info)1082 void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
1083 info->set_type(FileType::File);
1084 info->set_size(static_cast<int64_t>(obj.GetSize()));
1085 info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1086 }
1087
1088 struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
1089 using ResultHandler = std::function<Status(const std::string& prefix,
1090 const S3Model::ListObjectsV2Result&)>;
1091 using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
1092 using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
1093
1094 Aws::S3::S3Client* client_;
1095 const std::string bucket_;
1096 const std::string base_dir_;
1097 const int32_t max_keys_;
1098 const ResultHandler result_handler_;
1099 const ErrorHandler error_handler_;
1100 const RecursionHandler recursion_handler_;
1101
1102 template <typename... Args>
Walkarrow::fs::__anoneb4239ef0311::TreeWalker1103 static Status Walk(Args&&... args) {
1104 auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
1105 return self->DoWalk();
1106 }
1107
TreeWalkerarrow::fs::__anoneb4239ef0311::TreeWalker1108 TreeWalker(Aws::S3::S3Client* client, std::string bucket, std::string base_dir,
1109 int32_t max_keys, ResultHandler result_handler, ErrorHandler error_handler,
1110 RecursionHandler recursion_handler)
1111 : client_(std::move(client)),
1112 bucket_(std::move(bucket)),
1113 base_dir_(std::move(base_dir)),
1114 max_keys_(max_keys),
1115 result_handler_(std::move(result_handler)),
1116 error_handler_(std::move(error_handler)),
1117 recursion_handler_(std::move(recursion_handler)) {}
1118
1119 private:
1120 std::mutex mutex_;
1121 Future<> future_;
1122 std::atomic<int32_t> num_in_flight_;
1123
DoWalkarrow::fs::__anoneb4239ef0311::TreeWalker1124 Status DoWalk() {
1125 future_ = decltype(future_)::Make();
1126 num_in_flight_ = 0;
1127 WalkChild(base_dir_, /*nesting_depth=*/0);
1128 // When this returns, ListObjectsV2 tasks either have finished or will exit early
1129 return future_.status();
1130 }
1131
is_finishedarrow::fs::__anoneb4239ef0311::TreeWalker1132 bool is_finished() const { return future_.is_finished(); }
1133
ListObjectsFinishedarrow::fs::__anoneb4239ef0311::TreeWalker1134 void ListObjectsFinished(Status st) {
1135 const auto in_flight = --num_in_flight_;
1136 if (!st.ok() || !in_flight) {
1137 future_.MarkFinished(std::move(st));
1138 }
1139 }
1140
1141 struct ListObjectsV2Handler {
1142 std::shared_ptr<TreeWalker> walker;
1143 std::string prefix;
1144 int32_t nesting_depth;
1145 S3Model::ListObjectsV2Request req;
1146
operator ()arrow::fs::__anoneb4239ef0311::TreeWalker::ListObjectsV2Handler1147 void operator()(const Aws::S3::S3Client*, const S3Model::ListObjectsV2Request&,
1148 const S3Model::ListObjectsV2Outcome& outcome,
1149 const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
1150 // Serialize calls to operation-specific handlers
1151 std::unique_lock<std::mutex> guard(walker->mutex_);
1152 if (walker->is_finished()) {
1153 // Early exit: avoid executing handlers if DoWalk() returned
1154 return;
1155 }
1156 if (!outcome.IsSuccess()) {
1157 Status st = walker->error_handler_(outcome.GetError());
1158 walker->ListObjectsFinished(std::move(st));
1159 return;
1160 }
1161 HandleResult(outcome.GetResult());
1162 }
1163
HandleResultarrow::fs::__anoneb4239ef0311::TreeWalker::ListObjectsV2Handler1164 void HandleResult(const S3Model::ListObjectsV2Result& result) {
1165 bool recurse = result.GetCommonPrefixes().size() > 0;
1166 if (recurse) {
1167 auto maybe_recurse = walker->recursion_handler_(nesting_depth + 1);
1168 if (!maybe_recurse.ok()) {
1169 walker->ListObjectsFinished(maybe_recurse.status());
1170 return;
1171 }
1172 recurse &= *maybe_recurse;
1173 }
1174 Status st = walker->result_handler_(prefix, result);
1175 if (!st.ok()) {
1176 walker->ListObjectsFinished(std::move(st));
1177 return;
1178 }
1179 if (recurse) {
1180 walker->WalkChildren(result, nesting_depth + 1);
1181 }
1182 // If the result was truncated, issue a continuation request to get
1183 // further directory entries.
1184 if (result.GetIsTruncated()) {
1185 DCHECK(!result.GetNextContinuationToken().empty());
1186 req.SetContinuationToken(result.GetNextContinuationToken());
1187 walker->client_->ListObjectsV2Async(req, *this);
1188 } else {
1189 walker->ListObjectsFinished(Status::OK());
1190 }
1191 }
1192
Startarrow::fs::__anoneb4239ef0311::TreeWalker::ListObjectsV2Handler1193 void Start() {
1194 req.SetBucket(ToAwsString(walker->bucket_));
1195 if (!prefix.empty()) {
1196 req.SetPrefix(ToAwsString(prefix) + kSep);
1197 }
1198 req.SetDelimiter(Aws::String() + kSep);
1199 req.SetMaxKeys(walker->max_keys_);
1200 walker->client_->ListObjectsV2Async(req, *this);
1201 }
1202 };
1203
WalkChildarrow::fs::__anoneb4239ef0311::TreeWalker1204 void WalkChild(std::string key, int32_t nesting_depth) {
1205 ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
1206 ++num_in_flight_;
1207 handler.Start();
1208 }
1209
WalkChildrenarrow::fs::__anoneb4239ef0311::TreeWalker1210 void WalkChildren(const S3Model::ListObjectsV2Result& result, int32_t nesting_depth) {
1211 for (const auto& prefix : result.GetCommonPrefixes()) {
1212 const auto child_key =
1213 internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1214 WalkChild(std::string{child_key}, nesting_depth);
1215 }
1216 }
1217
1218 friend struct ListObjectsV2Handler;
1219 };
1220
1221 } // namespace
1222
1223 // -----------------------------------------------------------------------
1224 // S3 filesystem implementation
1225
1226 class S3FileSystem::Impl {
1227 public:
1228 ClientBuilder builder_;
1229 std::unique_ptr<Aws::S3::S3Client> client_;
1230 util::optional<S3Backend> backend_;
1231
1232 const int32_t kListObjectsMaxKeys = 1000;
1233 // At most 1000 keys per multiple-delete request
1234 const int32_t kMultipleDeleteMaxKeys = 1000;
1235 // Limit recursing depth, since a recursion bomb can be created
1236 const int32_t kMaxNestingDepth = 100;
1237
Impl(S3Options options)1238 explicit Impl(S3Options options) : builder_(std::move(options)) {}
1239
Init()1240 Status Init() { return builder_.BuildClient().Value(&client_); }
1241
options() const1242 const S3Options& options() const { return builder_.options(); }
1243
region() const1244 std::string region() const {
1245 return std::string(FromAwsString(builder_.config().region));
1246 }
1247
1248 template <typename Error>
SaveBackend(const Aws::Client::AWSError<Error> & error)1249 void SaveBackend(const Aws::Client::AWSError<Error>& error) {
1250 if (!backend_ || *backend_ == S3Backend::Other) {
1251 backend_ = DetectS3Backend(error);
1252 }
1253 }
1254
1255 // Create a bucket. Successful if bucket already exists.
CreateBucket(const std::string & bucket)1256 Status CreateBucket(const std::string& bucket) {
1257 S3Model::CreateBucketConfiguration config;
1258 S3Model::CreateBucketRequest req;
1259 config.SetLocationConstraint(
1260 S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
1261 ToAwsString(options().region)));
1262 req.SetBucket(ToAwsString(bucket));
1263 req.SetCreateBucketConfiguration(config);
1264
1265 auto outcome = client_->CreateBucket(req);
1266 if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
1267 return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
1268 outcome.GetError());
1269 }
1270 return Status::OK();
1271 }
1272
1273 // Create an object with empty contents. Successful if object already exists.
CreateEmptyObject(const std::string & bucket,const std::string & key)1274 Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
1275 S3Model::PutObjectRequest req;
1276 req.SetBucket(ToAwsString(bucket));
1277 req.SetKey(ToAwsString(key));
1278 return OutcomeToStatus(
1279 std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
1280 client_->PutObject(req));
1281 }
1282
CreateEmptyDir(const std::string & bucket,const std::string & key)1283 Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
1284 DCHECK(!key.empty());
1285 return CreateEmptyObject(bucket, key + kSep);
1286 }
1287
DeleteObject(const std::string & bucket,const std::string & key)1288 Status DeleteObject(const std::string& bucket, const std::string& key) {
1289 S3Model::DeleteObjectRequest req;
1290 req.SetBucket(ToAwsString(bucket));
1291 req.SetKey(ToAwsString(key));
1292 return OutcomeToStatus(
1293 std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
1294 client_->DeleteObject(req));
1295 }
1296
CopyObject(const S3Path & src_path,const S3Path & dest_path)1297 Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
1298 S3Model::CopyObjectRequest req;
1299 req.SetBucket(ToAwsString(dest_path.bucket));
1300 req.SetKey(ToAwsString(dest_path.key));
1301 // Copy source "Must be URL-encoded" according to AWS SDK docs.
1302 req.SetCopySource(src_path.ToURLEncodedAwsString());
1303 return OutcomeToStatus(
1304 std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
1305 src_path.bucket, "' to key '", dest_path.key,
1306 "' in bucket '", dest_path.bucket, "': "),
1307 client_->CopyObject(req));
1308 }
1309
1310 // On Minio, an empty "directory" doesn't satisfy the same API requests as
1311 // a non-empty "directory". This is a Minio-specific quirk, but we need
1312 // to handle it for unit testing.
1313
IsEmptyDirectory(const std::string & bucket,const std::string & key,bool * out)1314 Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) {
1315 S3Model::HeadObjectRequest req;
1316 req.SetBucket(ToAwsString(bucket));
1317 if (backend_ && *backend_ == S3Backend::Minio) {
1318 // Minio wants a slash at the end, Amazon doesn't
1319 req.SetKey(ToAwsString(key) + kSep);
1320 } else {
1321 req.SetKey(ToAwsString(key));
1322 }
1323
1324 auto outcome = client_->HeadObject(req);
1325 if (outcome.IsSuccess()) {
1326 *out = true;
1327 return Status::OK();
1328 }
1329 if (!backend_) {
1330 SaveBackend(outcome.GetError());
1331 DCHECK(backend_);
1332 if (*backend_ == S3Backend::Minio) {
1333 // Try again with separator-terminated key (see above)
1334 return IsEmptyDirectory(bucket, key, out);
1335 }
1336 }
1337 if (IsNotFound(outcome.GetError())) {
1338 *out = false;
1339 return Status::OK();
1340 }
1341 return ErrorToStatus(std::forward_as_tuple("When reading information for key '", key,
1342 "' in bucket '", bucket, "': "),
1343 outcome.GetError());
1344 }
1345
IsEmptyDirectory(const S3Path & path,bool * out)1346 Status IsEmptyDirectory(const S3Path& path, bool* out) {
1347 return IsEmptyDirectory(path.bucket, path.key, out);
1348 }
1349
IsNonEmptyDirectory(const S3Path & path,bool * out)1350 Status IsNonEmptyDirectory(const S3Path& path, bool* out) {
1351 S3Model::ListObjectsV2Request req;
1352 req.SetBucket(ToAwsString(path.bucket));
1353 req.SetPrefix(ToAwsString(path.key) + kSep);
1354 req.SetDelimiter(Aws::String() + kSep);
1355 req.SetMaxKeys(1);
1356 auto outcome = client_->ListObjectsV2(req);
1357 if (outcome.IsSuccess()) {
1358 *out = outcome.GetResult().GetKeyCount() > 0;
1359 return Status::OK();
1360 }
1361 if (IsNotFound(outcome.GetError())) {
1362 *out = false;
1363 return Status::OK();
1364 }
1365 return ErrorToStatus(
1366 std::forward_as_tuple("When listing objects under key '", path.key,
1367 "' in bucket '", path.bucket, "': "),
1368 outcome.GetError());
1369 }
1370
CheckNestingDepth(int32_t nesting_depth)1371 Status CheckNestingDepth(int32_t nesting_depth) {
1372 if (nesting_depth >= kMaxNestingDepth) {
1373 return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1374 kMaxNestingDepth, ")");
1375 }
1376 return Status::OK();
1377 }
1378
1379 // Workhorse for GetTargetStats(FileSelector...)
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,std::vector<FileInfo> * out)1380 Status Walk(const FileSelector& select, const std::string& bucket,
1381 const std::string& key, std::vector<FileInfo>* out) {
1382 bool is_empty = true;
1383
1384 auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1385 if (select.allow_not_found && IsNotFound(error)) {
1386 return Status::OK();
1387 }
1388 return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1389 "' in bucket '", bucket, "': "),
1390 error);
1391 };
1392
1393 auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1394 RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1395 return select.recursive && nesting_depth <= select.max_recursion;
1396 };
1397
1398 auto handle_results = [&](const std::string& prefix,
1399 const S3Model::ListObjectsV2Result& result) -> Status {
1400 // Walk "directories"
1401 for (const auto& prefix : result.GetCommonPrefixes()) {
1402 is_empty = false;
1403 const auto child_key =
1404 internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1405 std::stringstream child_path;
1406 child_path << bucket << kSep << child_key;
1407 FileInfo info;
1408 info.set_path(child_path.str());
1409 info.set_type(FileType::Directory);
1410 out->push_back(std::move(info));
1411 }
1412 // Walk "files"
1413 for (const auto& obj : result.GetContents()) {
1414 is_empty = false;
1415 FileInfo info;
1416 const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
1417 if (child_key == util::string_view(prefix)) {
1418 // Amazon can return the "directory" key itself as part of the results, skip
1419 continue;
1420 }
1421 std::stringstream child_path;
1422 child_path << bucket << kSep << child_key;
1423 info.set_path(child_path.str());
1424 FileObjectToInfo(obj, &info);
1425 out->push_back(std::move(info));
1426 }
1427 return Status::OK();
1428 };
1429
1430 RETURN_NOT_OK(TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
1431 handle_results, handle_error, handle_recursion));
1432
1433 // If no contents were found, perhaps it's an empty "directory",
1434 // or perhaps it's a nonexistent entry. Check.
1435 if (is_empty && !select.allow_not_found) {
1436 RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &is_empty));
1437 if (!is_empty) {
1438 return PathNotFound(bucket, key);
1439 }
1440 }
1441 // Sort results for convenience, since they can come massively out of order
1442 std::sort(out->begin(), out->end(), FileInfo::ByPath{});
1443 return Status::OK();
1444 }
1445
WalkForDeleteDir(const std::string & bucket,const std::string & key,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1446 Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1447 std::vector<std::string>* file_keys,
1448 std::vector<std::string>* dir_keys) {
1449 auto handle_results = [&](const std::string& prefix,
1450 const S3Model::ListObjectsV2Result& result) -> Status {
1451 // Walk "files"
1452 file_keys->reserve(file_keys->size() + result.GetContents().size());
1453 for (const auto& obj : result.GetContents()) {
1454 file_keys->emplace_back(FromAwsString(obj.GetKey()));
1455 }
1456 // Walk "directories"
1457 dir_keys->reserve(dir_keys->size() + result.GetCommonPrefixes().size());
1458 for (const auto& prefix : result.GetCommonPrefixes()) {
1459 dir_keys->emplace_back(FromAwsString(prefix.GetPrefix()));
1460 }
1461 return Status::OK();
1462 };
1463
1464 auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1465 return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1466 "' in bucket '", bucket, "': "),
1467 error);
1468 };
1469
1470 auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1471 RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1472 return true; // Recurse
1473 };
1474
1475 return TreeWalker::Walk(client_.get(), bucket, key, kListObjectsMaxKeys,
1476 handle_results, handle_error, handle_recursion);
1477 }
1478
1479 // Delete multiple objects at once
DeleteObjects(const std::string & bucket,const std::vector<std::string> & keys)1480 Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
1481 struct DeleteHandler {
1482 Future<> future = Future<>::Make();
1483
1484 // Callback for DeleteObjectsAsync
1485 void operator()(const Aws::S3::S3Client*, const S3Model::DeleteObjectsRequest& req,
1486 const S3Model::DeleteObjectsOutcome& outcome,
1487 const std::shared_ptr<const Aws::Client::AsyncCallerContext>&) {
1488 if (!outcome.IsSuccess()) {
1489 future.MarkFinished(ErrorToStatus(outcome.GetError()));
1490 return;
1491 }
1492 // Also need to check per-key errors, even on successful outcome
1493 // See
1494 // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
1495 const auto& errors = outcome.GetResult().GetErrors();
1496 if (!errors.empty()) {
1497 std::stringstream ss;
1498 ss << "Got the following " << errors.size()
1499 << " errors when deleting objects in S3 bucket '" << req.GetBucket()
1500 << "':\n";
1501 for (const auto& error : errors) {
1502 ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
1503 }
1504 future.MarkFinished(Status::IOError(ss.str()));
1505 } else {
1506 future.MarkFinished();
1507 }
1508 }
1509 };
1510
1511 const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
1512 std::vector<DeleteHandler> delete_handlers;
1513 std::vector<Future<>*> futures;
1514 delete_handlers.reserve(keys.size() / chunk_size + 1);
1515 futures.reserve(delete_handlers.capacity());
1516
1517 for (size_t start = 0; start < keys.size(); start += chunk_size) {
1518 S3Model::DeleteObjectsRequest req;
1519 S3Model::Delete del;
1520 for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) {
1521 del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i])));
1522 }
1523 req.SetBucket(ToAwsString(bucket));
1524 req.SetDelete(std::move(del));
1525 delete_handlers.emplace_back();
1526 futures.push_back(&delete_handlers.back().future);
1527 client_->DeleteObjectsAsync(req, delete_handlers.back());
1528 }
1529
1530 WaitForAll(futures);
1531 for (const auto* fut : futures) {
1532 RETURN_NOT_OK(fut->status());
1533 }
1534 return Status::OK();
1535 }
1536
DeleteDirContents(const std::string & bucket,const std::string & key)1537 Status DeleteDirContents(const std::string& bucket, const std::string& key) {
1538 std::vector<std::string> file_keys;
1539 std::vector<std::string> dir_keys;
1540 RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys));
1541 if (file_keys.empty() && dir_keys.empty() && !key.empty()) {
1542 // No contents found, is it an empty directory?
1543 bool exists = false;
1544 RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists));
1545 if (!exists) {
1546 return PathNotFound(bucket, key);
1547 }
1548 }
1549 // First delete all "files", then delete all child "directories"
1550 RETURN_NOT_OK(DeleteObjects(bucket, file_keys));
1551 // Delete directories in reverse lexicographic order, to ensure children
1552 // are deleted before their parents (Minio).
1553 std::sort(dir_keys.rbegin(), dir_keys.rend());
1554 return DeleteObjects(bucket, dir_keys);
1555 }
1556
EnsureDirectoryExists(const S3Path & path)1557 Status EnsureDirectoryExists(const S3Path& path) {
1558 if (!path.key.empty()) {
1559 return CreateEmptyDir(path.bucket, path.key);
1560 }
1561 return Status::OK();
1562 }
1563
EnsureParentExists(const S3Path & path)1564 Status EnsureParentExists(const S3Path& path) {
1565 if (path.has_parent()) {
1566 return EnsureDirectoryExists(path.parent());
1567 }
1568 return Status::OK();
1569 }
1570
ListBuckets(std::vector<std::string> * out)1571 Status ListBuckets(std::vector<std::string>* out) {
1572 out->clear();
1573 auto outcome = client_->ListBuckets();
1574 if (!outcome.IsSuccess()) {
1575 return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
1576 outcome.GetError());
1577 }
1578 for (const auto& bucket : outcome.GetResult().GetBuckets()) {
1579 out->emplace_back(FromAwsString(bucket.GetName()));
1580 }
1581 return Status::OK();
1582 }
1583
OpenInputFile(const std::string & s,S3FileSystem * fs)1584 Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
1585 S3FileSystem* fs) {
1586 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1587 RETURN_NOT_OK(ValidateFilePath(path));
1588
1589 auto ptr =
1590 std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(), path);
1591 RETURN_NOT_OK(ptr->Init());
1592 return ptr;
1593 }
1594
OpenInputFile(const FileInfo & info,S3FileSystem * fs)1595 Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
1596 S3FileSystem* fs) {
1597 if (info.type() == FileType::NotFound) {
1598 return ::arrow::fs::internal::PathNotFound(info.path());
1599 }
1600 if (info.type() != FileType::File && info.type() != FileType::Unknown) {
1601 return ::arrow::fs::internal::NotAFile(info.path());
1602 }
1603
1604 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
1605 RETURN_NOT_OK(ValidateFilePath(path));
1606
1607 auto ptr = std::make_shared<ObjectInputFile>(fs->shared_from_this(), client_.get(),
1608 path, info.size());
1609 RETURN_NOT_OK(ptr->Init());
1610 return ptr;
1611 }
1612 };
1613
S3FileSystem(const S3Options & options)1614 S3FileSystem::S3FileSystem(const S3Options& options) : impl_(new Impl{options}) {}
1615
~S3FileSystem()1616 S3FileSystem::~S3FileSystem() {}
1617
Make(const S3Options & options)1618 Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(const S3Options& options) {
1619 RETURN_NOT_OK(CheckS3Initialized());
1620
1621 std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options));
1622 RETURN_NOT_OK(ptr->impl_->Init());
1623 return ptr;
1624 }
1625
Equals(const FileSystem & other) const1626 bool S3FileSystem::Equals(const FileSystem& other) const {
1627 if (this == &other) {
1628 return true;
1629 }
1630 if (other.type_name() != type_name()) {
1631 return false;
1632 }
1633 const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
1634 return options().Equals(s3fs.options());
1635 }
1636
options() const1637 S3Options S3FileSystem::options() const { return impl_->options(); }
1638
region() const1639 std::string S3FileSystem::region() const { return impl_->region(); }
1640
GetFileInfo(const std::string & s)1641 Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
1642 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1643 FileInfo info;
1644 info.set_path(s);
1645
1646 if (path.empty()) {
1647 // It's the root path ""
1648 info.set_type(FileType::Directory);
1649 return info;
1650 } else if (path.key.empty()) {
1651 // It's a bucket
1652 S3Model::HeadBucketRequest req;
1653 req.SetBucket(ToAwsString(path.bucket));
1654
1655 auto outcome = impl_->client_->HeadBucket(req);
1656 if (!outcome.IsSuccess()) {
1657 if (!IsNotFound(outcome.GetError())) {
1658 return ErrorToStatus(
1659 std::forward_as_tuple("When getting information for bucket '", path.bucket,
1660 "': "),
1661 outcome.GetError());
1662 }
1663 info.set_type(FileType::NotFound);
1664 return info;
1665 }
1666 // NOTE: S3 doesn't have a bucket modification time. Only a creation
1667 // time is available, and you have to list all buckets to get it.
1668 info.set_type(FileType::Directory);
1669 return info;
1670 } else {
1671 // It's an object
1672 S3Model::HeadObjectRequest req;
1673 req.SetBucket(ToAwsString(path.bucket));
1674 req.SetKey(ToAwsString(path.key));
1675
1676 auto outcome = impl_->client_->HeadObject(req);
1677 if (outcome.IsSuccess()) {
1678 // "File" object found
1679 FileObjectToInfo(outcome.GetResult(), &info);
1680 return info;
1681 }
1682 if (!IsNotFound(outcome.GetError())) {
1683 return ErrorToStatus(
1684 std::forward_as_tuple("When getting information for key '", path.key,
1685 "' in bucket '", path.bucket, "': "),
1686 outcome.GetError());
1687 }
1688 // Not found => perhaps it's an empty "directory"
1689 bool is_dir = false;
1690 RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir));
1691 if (is_dir) {
1692 info.set_type(FileType::Directory);
1693 return info;
1694 }
1695 // Not found => perhaps it's a non-empty "directory"
1696 RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir));
1697 if (is_dir) {
1698 info.set_type(FileType::Directory);
1699 } else {
1700 info.set_type(FileType::NotFound);
1701 }
1702 return info;
1703 }
1704 }
1705
GetFileInfo(const FileSelector & select)1706 Result<std::vector<FileInfo>> S3FileSystem::GetFileInfo(const FileSelector& select) {
1707 ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
1708
1709 std::vector<FileInfo> results;
1710
1711 if (base_path.empty()) {
1712 // List all buckets
1713 std::vector<std::string> buckets;
1714 RETURN_NOT_OK(impl_->ListBuckets(&buckets));
1715 for (const auto& bucket : buckets) {
1716 FileInfo info;
1717 info.set_path(bucket);
1718 info.set_type(FileType::Directory);
1719 results.push_back(std::move(info));
1720 if (select.recursive) {
1721 RETURN_NOT_OK(impl_->Walk(select, bucket, "", &results));
1722 }
1723 }
1724 return results;
1725 }
1726
1727 // Nominal case -> walk a single bucket
1728 RETURN_NOT_OK(impl_->Walk(select, base_path.bucket, base_path.key, &results));
1729 return results;
1730 }
1731
CreateDir(const std::string & s,bool recursive)1732 Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
1733 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1734
1735 if (path.key.empty()) {
1736 // Create bucket
1737 return impl_->CreateBucket(path.bucket);
1738 }
1739
1740 // Create object
1741 if (recursive) {
1742 // Ensure bucket exists
1743 RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
1744 // Ensure that all parents exist, then the directory itself
1745 std::string parent_key;
1746 for (const auto& part : path.key_parts) {
1747 parent_key += part;
1748 parent_key += kSep;
1749 RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key));
1750 }
1751 return Status::OK();
1752 } else {
1753 // Check parent dir exists
1754 if (path.has_parent()) {
1755 S3Path parent_path = path.parent();
1756 bool exists;
1757 RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists));
1758 if (!exists) {
1759 RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists));
1760 }
1761 if (!exists) {
1762 return Status::IOError("Cannot create directory '", path.full_path,
1763 "': parent directory does not exist");
1764 }
1765 }
1766
1767 // XXX Should we check that no non-directory entry exists?
1768 // Minio does it for us, not sure about other S3 implementations.
1769 return impl_->CreateEmptyDir(path.bucket, path.key);
1770 }
1771 }
1772
DeleteDir(const std::string & s)1773 Status S3FileSystem::DeleteDir(const std::string& s) {
1774 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1775
1776 if (path.empty()) {
1777 return Status::NotImplemented("Cannot delete all S3 buckets");
1778 }
1779 RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1780 if (path.key.empty()) {
1781 // Delete bucket
1782 S3Model::DeleteBucketRequest req;
1783 req.SetBucket(ToAwsString(path.bucket));
1784 return OutcomeToStatus(
1785 std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
1786 impl_->client_->DeleteBucket(req));
1787 } else {
1788 // Delete "directory"
1789 RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
1790 // Parent may be implicitly deleted if it became empty, recreate it
1791 return impl_->EnsureParentExists(path);
1792 }
1793 }
1794
DeleteDirContents(const std::string & s)1795 Status S3FileSystem::DeleteDirContents(const std::string& s) {
1796 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1797
1798 if (path.empty()) {
1799 return Status::NotImplemented("Cannot delete all S3 buckets");
1800 }
1801 RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
1802 // Directory may be implicitly deleted, recreate it
1803 return impl_->EnsureDirectoryExists(path);
1804 }
1805
DeleteRootDirContents()1806 Status S3FileSystem::DeleteRootDirContents() {
1807 return Status::NotImplemented("Cannot delete all S3 buckets");
1808 }
1809
DeleteFile(const std::string & s)1810 Status S3FileSystem::DeleteFile(const std::string& s) {
1811 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1812 RETURN_NOT_OK(ValidateFilePath(path));
1813
1814 // Check the object exists
1815 S3Model::HeadObjectRequest req;
1816 req.SetBucket(ToAwsString(path.bucket));
1817 req.SetKey(ToAwsString(path.key));
1818
1819 auto outcome = impl_->client_->HeadObject(req);
1820 if (!outcome.IsSuccess()) {
1821 if (IsNotFound(outcome.GetError())) {
1822 return PathNotFound(path);
1823 } else {
1824 return ErrorToStatus(
1825 std::forward_as_tuple("When getting information for key '", path.key,
1826 "' in bucket '", path.bucket, "': "),
1827 outcome.GetError());
1828 }
1829 }
1830 // Object found, delete it
1831 RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key));
1832 // Parent may be implicitly deleted if it became empty, recreate it
1833 return impl_->EnsureParentExists(path);
1834 }
1835
Move(const std::string & src,const std::string & dest)1836 Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
1837 // XXX We don't implement moving directories as it would be too expensive:
1838 // one must copy all directory contents one by one (including object data),
1839 // then delete the original contents.
1840
1841 ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
1842 RETURN_NOT_OK(ValidateFilePath(src_path));
1843 ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
1844 RETURN_NOT_OK(ValidateFilePath(dest_path));
1845
1846 if (src_path == dest_path) {
1847 return Status::OK();
1848 }
1849 RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path));
1850 RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key));
1851 // Source parent may be implicitly deleted if it became empty, recreate it
1852 return impl_->EnsureParentExists(src_path);
1853 }
1854
CopyFile(const std::string & src,const std::string & dest)1855 Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
1856 ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
1857 RETURN_NOT_OK(ValidateFilePath(src_path));
1858 ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
1859 RETURN_NOT_OK(ValidateFilePath(dest_path));
1860
1861 if (src_path == dest_path) {
1862 return Status::OK();
1863 }
1864 return impl_->CopyObject(src_path, dest_path);
1865 }
1866
OpenInputStream(const std::string & s)1867 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
1868 const std::string& s) {
1869 return impl_->OpenInputFile(s, this);
1870 }
1871
OpenInputStream(const FileInfo & info)1872 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
1873 const FileInfo& info) {
1874 return impl_->OpenInputFile(info, this);
1875 }
1876
OpenInputFile(const std::string & s)1877 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
1878 const std::string& s) {
1879 return impl_->OpenInputFile(s, this);
1880 }
1881
OpenInputFile(const FileInfo & info)1882 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
1883 const FileInfo& info) {
1884 return impl_->OpenInputFile(info, this);
1885 }
1886
OpenOutputStream(const std::string & s)1887 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
1888 const std::string& s) {
1889 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
1890 RETURN_NOT_OK(ValidateFilePath(path));
1891
1892 auto ptr = std::make_shared<ObjectOutputStream>(
1893 shared_from_this(), impl_->client_.get(), path, impl_->options());
1894 RETURN_NOT_OK(ptr->Init());
1895 return ptr;
1896 }
1897
OpenAppendStream(const std::string & path)1898 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
1899 const std::string& path) {
1900 // XXX Investigate UploadPartCopy? Does it work with source == destination?
1901 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
1902 // (but would need to fall back to GET if the current data is < 5 MB)
1903 return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
1904 }
1905
1906 //
1907 // Top-level utility functions
1908 //
1909
ResolveBucketRegion(const std::string & bucket)1910 Result<std::string> ResolveBucketRegion(const std::string& bucket) {
1911 ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
1912 return resolver->ResolveRegion(bucket);
1913 }
1914
1915 } // namespace fs
1916 } // namespace arrow
1917