1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "arrow/filesystem/s3fs.h"
19
20 #include <algorithm>
21 #include <atomic>
22 #include <chrono>
23 #include <condition_variable>
24 #include <functional>
25 #include <memory>
26 #include <mutex>
27 #include <sstream>
28 #include <thread>
29 #include <unordered_map>
30 #include <utility>
31
32 #ifdef _WIN32
33 // Undefine preprocessor macros that interfere with AWS function / method names
34 #ifdef GetMessage
35 #undef GetMessage
36 #endif
37 #ifdef GetObject
38 #undef GetObject
39 #endif
40 #endif
41
42 #include <aws/core/Aws.h>
43 #include <aws/core/Region.h>
44 #include <aws/core/auth/AWSCredentials.h>
45 #include <aws/core/auth/AWSCredentialsProviderChain.h>
46 #include <aws/core/auth/STSCredentialsProvider.h>
47 #include <aws/core/client/DefaultRetryStrategy.h>
48 #include <aws/core/client/RetryStrategy.h>
49 #include <aws/core/http/HttpResponse.h>
50 #include <aws/core/utils/logging/ConsoleLogSystem.h>
51 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
52 #include <aws/core/utils/xml/XmlSerializer.h>
53 #include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
54 #include <aws/s3/S3Client.h>
55 #include <aws/s3/model/AbortMultipartUploadRequest.h>
56 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
57 #include <aws/s3/model/CompletedMultipartUpload.h>
58 #include <aws/s3/model/CompletedPart.h>
59 #include <aws/s3/model/CopyObjectRequest.h>
60 #include <aws/s3/model/CreateBucketRequest.h>
61 #include <aws/s3/model/CreateMultipartUploadRequest.h>
62 #include <aws/s3/model/DeleteBucketRequest.h>
63 #include <aws/s3/model/DeleteObjectRequest.h>
64 #include <aws/s3/model/DeleteObjectsRequest.h>
65 #include <aws/s3/model/GetObjectRequest.h>
66 #include <aws/s3/model/HeadBucketRequest.h>
67 #include <aws/s3/model/HeadObjectRequest.h>
68 #include <aws/s3/model/ListBucketsResult.h>
69 #include <aws/s3/model/ListObjectsV2Request.h>
70 #include <aws/s3/model/ObjectCannedACL.h>
71 #include <aws/s3/model/PutObjectRequest.h>
72 #include <aws/s3/model/UploadPartRequest.h>
73
74 #include "arrow/util/windows_fixup.h"
75
76 #include "arrow/buffer.h"
77 #include "arrow/filesystem/filesystem.h"
78 #include "arrow/filesystem/path_util.h"
79 #include "arrow/filesystem/s3_internal.h"
80 #include "arrow/filesystem/util_internal.h"
81 #include "arrow/io/interfaces.h"
82 #include "arrow/io/memory.h"
83 #include "arrow/io/util_internal.h"
84 #include "arrow/result.h"
85 #include "arrow/status.h"
86 #include "arrow/util/async_generator.h"
87 #include "arrow/util/atomic_shared_ptr.h"
88 #include "arrow/util/checked_cast.h"
89 #include "arrow/util/future.h"
90 #include "arrow/util/key_value_metadata.h"
91 #include "arrow/util/logging.h"
92 #include "arrow/util/optional.h"
93 #include "arrow/util/task_group.h"
94 #include "arrow/util/thread_pool.h"
95
96 namespace arrow {
97
98 using internal::TaskGroup;
99 using internal::Uri;
100 using io::internal::SubmitIO;
101
102 namespace fs {
103
104 using ::Aws::Client::AWSError;
105 using ::Aws::S3::S3Errors;
106 namespace S3Model = Aws::S3::Model;
107
108 using internal::ConnectRetryStrategy;
109 using internal::DetectS3Backend;
110 using internal::ErrorToStatus;
111 using internal::FromAwsDatetime;
112 using internal::FromAwsString;
113 using internal::IsAlreadyExists;
114 using internal::IsNotFound;
115 using internal::OutcomeToResult;
116 using internal::OutcomeToStatus;
117 using internal::S3Backend;
118 using internal::ToAwsString;
119 using internal::ToURLEncodedAwsString;
120
121 static const char kSep = '/';
122
123 namespace {
124
125 std::mutex aws_init_lock;
126 Aws::SDKOptions aws_options;
127 std::atomic<bool> aws_initialized(false);
128
DoInitializeS3(const S3GlobalOptions & options)129 Status DoInitializeS3(const S3GlobalOptions& options) {
130 Aws::Utils::Logging::LogLevel aws_log_level;
131
132 #define LOG_LEVEL_CASE(level_name) \
133 case S3LogLevel::level_name: \
134 aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
135 break;
136
137 switch (options.log_level) {
138 LOG_LEVEL_CASE(Fatal)
139 LOG_LEVEL_CASE(Error)
140 LOG_LEVEL_CASE(Warn)
141 LOG_LEVEL_CASE(Info)
142 LOG_LEVEL_CASE(Debug)
143 LOG_LEVEL_CASE(Trace)
144 default:
145 aws_log_level = Aws::Utils::Logging::LogLevel::Off;
146 }
147
148 #undef LOG_LEVEL_CASE
149
150 aws_options.loggingOptions.logLevel = aws_log_level;
151 // By default the AWS SDK logs to files, log to console instead
152 aws_options.loggingOptions.logger_create_fn = [] {
153 return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
154 aws_options.loggingOptions.logLevel);
155 };
156 Aws::InitAPI(aws_options);
157 aws_initialized.store(true);
158 return Status::OK();
159 }
160
161 } // namespace
162
InitializeS3(const S3GlobalOptions & options)163 Status InitializeS3(const S3GlobalOptions& options) {
164 std::lock_guard<std::mutex> lock(aws_init_lock);
165 return DoInitializeS3(options);
166 }
167
FinalizeS3()168 Status FinalizeS3() {
169 std::lock_guard<std::mutex> lock(aws_init_lock);
170 Aws::ShutdownAPI(aws_options);
171 aws_initialized.store(false);
172 return Status::OK();
173 }
174
EnsureS3Initialized()175 Status EnsureS3Initialized() {
176 std::lock_guard<std::mutex> lock(aws_init_lock);
177 if (!aws_initialized.load()) {
178 S3GlobalOptions options{S3LogLevel::Fatal};
179 return DoInitializeS3(options);
180 }
181 return Status::OK();
182 }
183
184 // -----------------------------------------------------------------------
185 // S3ProxyOptions implementation
186
FromUri(const Uri & uri)187 Result<S3ProxyOptions> S3ProxyOptions::FromUri(const Uri& uri) {
188 S3ProxyOptions options;
189
190 options.scheme = uri.scheme();
191 options.host = uri.host();
192 options.port = uri.port();
193 options.username = uri.username();
194 options.password = uri.password();
195
196 return options;
197 }
198
FromUri(const std::string & uri_string)199 Result<S3ProxyOptions> S3ProxyOptions::FromUri(const std::string& uri_string) {
200 Uri uri;
201 RETURN_NOT_OK(uri.Parse(uri_string));
202 return FromUri(uri);
203 }
204
Equals(const S3ProxyOptions & other) const205 bool S3ProxyOptions::Equals(const S3ProxyOptions& other) const {
206 return (scheme == other.scheme && host == other.host && port == other.port &&
207 username == other.username && password == other.password);
208 }
209
210 // -----------------------------------------------------------------------
211 // S3Options implementation
212
ConfigureDefaultCredentials()213 void S3Options::ConfigureDefaultCredentials() {
214 credentials_provider =
215 std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
216 credentials_kind = S3CredentialsKind::Default;
217 }
218
ConfigureAnonymousCredentials()219 void S3Options::ConfigureAnonymousCredentials() {
220 credentials_provider = std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>();
221 credentials_kind = S3CredentialsKind::Anonymous;
222 }
223
ConfigureAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)224 void S3Options::ConfigureAccessKey(const std::string& access_key,
225 const std::string& secret_key,
226 const std::string& session_token) {
227 credentials_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
228 ToAwsString(access_key), ToAwsString(secret_key), ToAwsString(session_token));
229 credentials_kind = S3CredentialsKind::Explicit;
230 }
231
ConfigureAssumeRoleCredentials(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)232 void S3Options::ConfigureAssumeRoleCredentials(
233 const std::string& role_arn, const std::string& session_name,
234 const std::string& external_id, int load_frequency,
235 const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
236 credentials_provider = std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
237 ToAwsString(role_arn), ToAwsString(session_name), ToAwsString(external_id),
238 load_frequency, stsClient);
239 credentials_kind = S3CredentialsKind::Role;
240 }
241
ConfigureAssumeRoleWithWebIdentityCredentials()242 void S3Options::ConfigureAssumeRoleWithWebIdentityCredentials() {
243 // The AWS SDK uses environment variables AWS_DEFAULT_REGION,
244 // AWS_ROLE_ARN, AWS_WEB_IDENTITY_TOKEN_FILE and AWS_ROLE_SESSION_NAME
245 // to configure the required credentials
246 credentials_provider =
247 std::make_shared<Aws::Auth::STSAssumeRoleWebIdentityCredentialsProvider>();
248 credentials_kind = S3CredentialsKind::WebIdentity;
249 }
250
GetAccessKey() const251 std::string S3Options::GetAccessKey() const {
252 auto credentials = credentials_provider->GetAWSCredentials();
253 return std::string(FromAwsString(credentials.GetAWSAccessKeyId()));
254 }
255
GetSecretKey() const256 std::string S3Options::GetSecretKey() const {
257 auto credentials = credentials_provider->GetAWSCredentials();
258 return std::string(FromAwsString(credentials.GetAWSSecretKey()));
259 }
260
GetSessionToken() const261 std::string S3Options::GetSessionToken() const {
262 auto credentials = credentials_provider->GetAWSCredentials();
263 return std::string(FromAwsString(credentials.GetSessionToken()));
264 }
265
Defaults()266 S3Options S3Options::Defaults() {
267 S3Options options;
268 options.ConfigureDefaultCredentials();
269 return options;
270 }
271
Anonymous()272 S3Options S3Options::Anonymous() {
273 S3Options options;
274 options.ConfigureAnonymousCredentials();
275 return options;
276 }
277
FromAccessKey(const std::string & access_key,const std::string & secret_key,const std::string & session_token)278 S3Options S3Options::FromAccessKey(const std::string& access_key,
279 const std::string& secret_key,
280 const std::string& session_token) {
281 S3Options options;
282 options.ConfigureAccessKey(access_key, secret_key, session_token);
283 return options;
284 }
285
FromAssumeRole(const std::string & role_arn,const std::string & session_name,const std::string & external_id,int load_frequency,const std::shared_ptr<Aws::STS::STSClient> & stsClient)286 S3Options S3Options::FromAssumeRole(
287 const std::string& role_arn, const std::string& session_name,
288 const std::string& external_id, int load_frequency,
289 const std::shared_ptr<Aws::STS::STSClient>& stsClient) {
290 S3Options options;
291 options.role_arn = role_arn;
292 options.session_name = session_name;
293 options.external_id = external_id;
294 options.load_frequency = load_frequency;
295 options.ConfigureAssumeRoleCredentials(role_arn, session_name, external_id,
296 load_frequency, stsClient);
297 return options;
298 }
299
FromAssumeRoleWithWebIdentity()300 S3Options S3Options::FromAssumeRoleWithWebIdentity() {
301 S3Options options;
302 options.ConfigureAssumeRoleWithWebIdentityCredentials();
303 return options;
304 }
305
FromUri(const Uri & uri,std::string * out_path)306 Result<S3Options> S3Options::FromUri(const Uri& uri, std::string* out_path) {
307 S3Options options;
308
309 const auto bucket = uri.host();
310 auto path = uri.path();
311 if (bucket.empty()) {
312 if (!path.empty()) {
313 return Status::Invalid("Missing bucket name in S3 URI");
314 }
315 } else {
316 if (path.empty()) {
317 path = bucket;
318 } else {
319 if (path[0] != '/') {
320 return Status::Invalid("S3 URI should absolute, not relative");
321 }
322 path = bucket + path;
323 }
324 }
325 if (out_path != nullptr) {
326 *out_path = std::string(internal::RemoveTrailingSlash(path));
327 }
328
329 std::unordered_map<std::string, std::string> options_map;
330 ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
331 for (const auto& kv : options_items) {
332 options_map.emplace(kv.first, kv.second);
333 }
334
335 const auto username = uri.username();
336 if (!username.empty()) {
337 options.ConfigureAccessKey(username, uri.password());
338 } else {
339 options.ConfigureDefaultCredentials();
340 }
341
342 bool region_set = false;
343 for (const auto& kv : options_map) {
344 if (kv.first == "region") {
345 options.region = kv.second;
346 region_set = true;
347 } else if (kv.first == "scheme") {
348 options.scheme = kv.second;
349 } else if (kv.first == "endpoint_override") {
350 options.endpoint_override = kv.second;
351 } else {
352 return Status::Invalid("Unexpected query parameter in S3 URI: '", kv.first, "'");
353 }
354 }
355
356 if (!region_set && !bucket.empty() && options.endpoint_override.empty()) {
357 // XXX Should we use a dedicated resolver with the given credentials?
358 ARROW_ASSIGN_OR_RAISE(options.region, ResolveBucketRegion(bucket));
359 }
360
361 return options;
362 }
363
FromUri(const std::string & uri_string,std::string * out_path)364 Result<S3Options> S3Options::FromUri(const std::string& uri_string,
365 std::string* out_path) {
366 Uri uri;
367 RETURN_NOT_OK(uri.Parse(uri_string));
368 return FromUri(uri, out_path);
369 }
370
Equals(const S3Options & other) const371 bool S3Options::Equals(const S3Options& other) const {
372 return (region == other.region && endpoint_override == other.endpoint_override &&
373 scheme == other.scheme && background_writes == other.background_writes &&
374 credentials_kind == other.credentials_kind &&
375 proxy_options.Equals(other.proxy_options) &&
376 GetAccessKey() == other.GetAccessKey() &&
377 GetSecretKey() == other.GetSecretKey() &&
378 GetSessionToken() == other.GetSessionToken());
379 }
380
381 namespace {
382
CheckS3Initialized()383 Status CheckS3Initialized() {
384 if (!aws_initialized.load()) {
385 return Status::Invalid(
386 "S3 subsystem not initialized; please call InitializeS3() "
387 "before carrying out any S3-related operation");
388 }
389 return Status::OK();
390 }
391
392 // XXX Sanitize paths by removing leading slash?
393
394 struct S3Path {
395 std::string full_path;
396 std::string bucket;
397 std::string key;
398 std::vector<std::string> key_parts;
399
FromStringarrow::fs::__anon5c130cae0311::S3Path400 static Result<S3Path> FromString(const std::string& s) {
401 const auto src = internal::RemoveTrailingSlash(s);
402 auto first_sep = src.find_first_of(kSep);
403 if (first_sep == 0) {
404 return Status::Invalid("Path cannot start with a separator ('", s, "')");
405 }
406 if (first_sep == std::string::npos) {
407 return S3Path{std::string(src), std::string(src), "", {}};
408 }
409 S3Path path;
410 path.full_path = std::string(src);
411 path.bucket = std::string(src.substr(0, first_sep));
412 path.key = std::string(src.substr(first_sep + 1));
413 path.key_parts = internal::SplitAbstractPath(path.key);
414 RETURN_NOT_OK(Validate(&path));
415 return path;
416 }
417
Validatearrow::fs::__anon5c130cae0311::S3Path418 static Status Validate(const S3Path* path) {
419 auto result = internal::ValidateAbstractPathParts(path->key_parts);
420 if (!result.ok()) {
421 return Status::Invalid(result.message(), " in path ", path->full_path);
422 } else {
423 return result;
424 }
425 }
426
ToAwsStringarrow::fs::__anon5c130cae0311::S3Path427 Aws::String ToAwsString() const {
428 Aws::String res(bucket.begin(), bucket.end());
429 res.reserve(bucket.size() + key.size() + 1);
430 res += kSep;
431 res.append(key.begin(), key.end());
432 return res;
433 }
434
ToURLEncodedAwsStringarrow::fs::__anon5c130cae0311::S3Path435 Aws::String ToURLEncodedAwsString() const {
436 // URL-encode individual parts, not the '/' separator
437 Aws::String res;
438 res += internal::ToURLEncodedAwsString(bucket);
439 for (const auto& part : key_parts) {
440 res += kSep;
441 res += internal::ToURLEncodedAwsString(part);
442 }
443 return res;
444 }
445
parentarrow::fs::__anon5c130cae0311::S3Path446 S3Path parent() const {
447 DCHECK(!key_parts.empty());
448 auto parent = S3Path{"", bucket, "", key_parts};
449 parent.key_parts.pop_back();
450 parent.key = internal::JoinAbstractPath(parent.key_parts);
451 parent.full_path = parent.bucket + kSep + parent.key;
452 return parent;
453 }
454
has_parentarrow::fs::__anon5c130cae0311::S3Path455 bool has_parent() const { return !key.empty(); }
456
emptyarrow::fs::__anon5c130cae0311::S3Path457 bool empty() const { return bucket.empty() && key.empty(); }
458
operator ==arrow::fs::__anon5c130cae0311::S3Path459 bool operator==(const S3Path& other) const {
460 return bucket == other.bucket && key == other.key;
461 }
462 };
463
464 // XXX return in OutcomeToStatus instead?
PathNotFound(const S3Path & path)465 Status PathNotFound(const S3Path& path) {
466 return ::arrow::fs::internal::PathNotFound(path.full_path);
467 }
468
PathNotFound(const std::string & bucket,const std::string & key)469 Status PathNotFound(const std::string& bucket, const std::string& key) {
470 return ::arrow::fs::internal::PathNotFound(bucket + kSep + key);
471 }
472
NotAFile(const S3Path & path)473 Status NotAFile(const S3Path& path) {
474 return ::arrow::fs::internal::NotAFile(path.full_path);
475 }
476
ValidateFilePath(const S3Path & path)477 Status ValidateFilePath(const S3Path& path) {
478 if (path.bucket.empty() || path.key.empty()) {
479 return NotAFile(path);
480 }
481 return Status::OK();
482 }
483
FormatRange(int64_t start,int64_t length)484 std::string FormatRange(int64_t start, int64_t length) {
485 // Format a HTTP range header value
486 std::stringstream ss;
487 ss << "bytes=" << start << "-" << start + length - 1;
488 return ss.str();
489 }
490
491 // An AWS RetryStrategy that wraps a provided arrow::fs::S3RetryStrategy
492 class WrappedRetryStrategy : public Aws::Client::RetryStrategy {
493 public:
WrappedRetryStrategy(const std::shared_ptr<S3RetryStrategy> & s3_retry_strategy)494 explicit WrappedRetryStrategy(const std::shared_ptr<S3RetryStrategy>& s3_retry_strategy)
495 : s3_retry_strategy_(s3_retry_strategy) {}
496
ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries) const497 bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
498 long attempted_retries) const override { // NOLINT runtime/int
499 S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error);
500 return s3_retry_strategy_->ShouldRetry(detail,
501 static_cast<int64_t>(attempted_retries));
502 }
503
CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries) const504 long CalculateDelayBeforeNextRetry( // NOLINT runtime/int
505 const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
506 long attempted_retries) const override { // NOLINT runtime/int
507 S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error);
508 return static_cast<long>( // NOLINT runtime/int
509 s3_retry_strategy_->CalculateDelayBeforeNextRetry(
510 detail, static_cast<int64_t>(attempted_retries)));
511 }
512
513 private:
514 template <typename ErrorType>
ErrorToDetail(const Aws::Client::AWSError<ErrorType> & error)515 static S3RetryStrategy::AWSErrorDetail ErrorToDetail(
516 const Aws::Client::AWSError<ErrorType>& error) {
517 S3RetryStrategy::AWSErrorDetail detail;
518 detail.error_type = static_cast<int>(error.GetErrorType());
519 detail.message = std::string(FromAwsString(error.GetMessage()));
520 detail.exception_name = std::string(FromAwsString(error.GetExceptionName()));
521 detail.should_retry = error.ShouldRetry();
522 return detail;
523 }
524
525 std::shared_ptr<S3RetryStrategy> s3_retry_strategy_;
526 };
527
528 class S3Client : public Aws::S3::S3Client {
529 public:
530 using Aws::S3::S3Client::S3Client;
531
532 // To get a bucket's region, we must extract the "x-amz-bucket-region" header
533 // from the response to a HEAD bucket request.
534 // Unfortunately, the S3Client APIs don't let us access the headers of successful
535 // responses. So we have to cook a AWS request and issue it ourselves.
536
GetBucketRegion(const S3Model::HeadBucketRequest & request)537 Result<std::string> GetBucketRegion(const S3Model::HeadBucketRequest& request) {
538 auto uri = GeneratePresignedUrl(request.GetBucket(),
539 /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD);
540 // NOTE: The signer region argument isn't passed here, as there's no easy
541 // way of computing it (the relevant method is private).
542 auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD,
543 Aws::Auth::SIGV4_SIGNER);
544 const auto code = outcome.IsSuccess() ? outcome.GetResult().GetResponseCode()
545 : outcome.GetError().GetResponseCode();
546 const auto& headers = outcome.IsSuccess()
547 ? outcome.GetResult().GetHeaderValueCollection()
548 : outcome.GetError().GetResponseHeaders();
549
550 const auto it = headers.find(ToAwsString("x-amz-bucket-region"));
551 if (it == headers.end()) {
552 if (code == Aws::Http::HttpResponseCode::NOT_FOUND) {
553 return Status::IOError("Bucket '", request.GetBucket(), "' not found");
554 } else if (!outcome.IsSuccess()) {
555 return ErrorToStatus(std::forward_as_tuple("When resolving region for bucket '",
556 request.GetBucket(), "': "),
557 outcome.GetError());
558 } else {
559 return Status::IOError("When resolving region for bucket '", request.GetBucket(),
560 "': missing 'x-amz-bucket-region' header in response");
561 }
562 }
563 return std::string(FromAwsString(it->second));
564 }
565
GetBucketRegion(const std::string & bucket)566 Result<std::string> GetBucketRegion(const std::string& bucket) {
567 S3Model::HeadBucketRequest req;
568 req.SetBucket(ToAwsString(bucket));
569 return GetBucketRegion(req);
570 }
571
CompleteMultipartUploadWithErrorFixup(S3Model::CompleteMultipartUploadRequest && request) const572 S3Model::CompleteMultipartUploadOutcome CompleteMultipartUploadWithErrorFixup(
573 S3Model::CompleteMultipartUploadRequest&& request) const {
574 // CompletedMultipartUpload can return a 200 OK response with an error
575 // encoded in the response body, in which case we should either retry
576 // or propagate the error to the user (see
577 // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html).
578 //
579 // Unfortunately the AWS SDK doesn't detect such situations but lets them
580 // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658).
581 //
582 // We work around the issue by registering a DataReceivedEventHandler
583 // which parses the XML response for embedded errors.
584
585 util::optional<AWSError<Aws::Client::CoreErrors>> aws_error;
586
587 auto handler = [&](const Aws::Http::HttpRequest* http_req,
588 Aws::Http::HttpResponse* http_resp,
589 long long) { // NOLINT runtime/int
590 auto& stream = http_resp->GetResponseBody();
591 const auto pos = stream.tellg();
592 const auto doc = Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream);
593 // Rewind stream for later
594 stream.clear();
595 stream.seekg(pos);
596
597 if (doc.WasParseSuccessful()) {
598 auto root = doc.GetRootElement();
599 if (!root.IsNull()) {
600 // Detect something that looks like an abnormal CompletedMultipartUpload
601 // response.
602 if (root.GetName() != "CompleteMultipartUploadResult" ||
603 !root.FirstChild("Error").IsNull() || !root.FirstChild("Errors").IsNull()) {
604 // Make sure the error marshaller doesn't see a 200 OK
605 http_resp->SetResponseCode(
606 Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
607 aws_error = GetErrorMarshaller()->Marshall(*http_resp);
608 // Rewind stream for later
609 stream.clear();
610 stream.seekg(pos);
611 }
612 }
613 }
614 };
615
616 request.SetDataReceivedEventHandler(std::move(handler));
617
618 // We don't have access to the configured AWS retry strategy
619 // (m_retryStrategy is a private member of AwsClient), so don't use that.
620 std::unique_ptr<Aws::Client::RetryStrategy> retry_strategy;
621 if (s3_retry_strategy_) {
622 retry_strategy.reset(new WrappedRetryStrategy(s3_retry_strategy_));
623 } else {
624 // Note that DefaultRetryStrategy, unlike StandardRetryStrategy,
625 // has empty definitions for RequestBookkeeping() and GetSendToken(),
626 // which simplifies the code below.
627 retry_strategy.reset(new Aws::Client::DefaultRetryStrategy());
628 }
629
630 for (int32_t retries = 0;; retries++) {
631 aws_error.reset();
632 auto outcome = Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request);
633 if (!outcome.IsSuccess()) {
634 // Error returned in HTTP headers (or client failure)
635 return outcome;
636 }
637 if (!aws_error.has_value()) {
638 // Genuinely successful outcome
639 return outcome;
640 }
641
642 const bool should_retry = retry_strategy->ShouldRetry(*aws_error, retries);
643
644 ARROW_LOG(WARNING)
645 << "CompletedMultipartUpload got error embedded in a 200 OK response: "
646 << aws_error->GetExceptionName() << " (\"" << aws_error->GetMessage()
647 << "\"), retry = " << should_retry;
648
649 if (!should_retry) {
650 break;
651 }
652 const auto delay = std::chrono::milliseconds(
653 retry_strategy->CalculateDelayBeforeNextRetry(*aws_error, retries));
654 std::this_thread::sleep_for(delay);
655 }
656
657 DCHECK(aws_error.has_value());
658 auto s3_error = AWSError<S3Errors>(std::move(aws_error).value());
659 return S3Model::CompleteMultipartUploadOutcome(std::move(s3_error));
660 }
661
662 std::shared_ptr<S3RetryStrategy> s3_retry_strategy_;
663 };
664
665 // In AWS SDK < 1.8, Aws::Client::ClientConfiguration::followRedirects is a bool.
666 template <bool Never = false>
DisableRedirectsImpl(bool * followRedirects)667 void DisableRedirectsImpl(bool* followRedirects) {
668 *followRedirects = false;
669 }
670
671 // In AWS SDK >= 1.8, it's a Aws::Client::FollowRedirectsPolicy scoped enum.
672 template <typename PolicyEnum, PolicyEnum Never = PolicyEnum::NEVER>
DisableRedirectsImpl(PolicyEnum * followRedirects)673 void DisableRedirectsImpl(PolicyEnum* followRedirects) {
674 *followRedirects = Never;
675 }
676
DisableRedirects(Aws::Client::ClientConfiguration * c)677 void DisableRedirects(Aws::Client::ClientConfiguration* c) {
678 DisableRedirectsImpl(&c->followRedirects);
679 }
680
681 class ClientBuilder {
682 public:
ClientBuilder(S3Options options)683 explicit ClientBuilder(S3Options options) : options_(std::move(options)) {}
684
config() const685 const Aws::Client::ClientConfiguration& config() const { return client_config_; }
686
mutable_config()687 Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; }
688
BuildClient()689 Result<std::shared_ptr<S3Client>> BuildClient() {
690 credentials_provider_ = options_.credentials_provider;
691 if (!options_.region.empty()) {
692 client_config_.region = ToAwsString(options_.region);
693 }
694 client_config_.endpointOverride = ToAwsString(options_.endpoint_override);
695 if (options_.scheme == "http") {
696 client_config_.scheme = Aws::Http::Scheme::HTTP;
697 } else if (options_.scheme == "https") {
698 client_config_.scheme = Aws::Http::Scheme::HTTPS;
699 } else {
700 return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'");
701 }
702 if (options_.retry_strategy) {
703 client_config_.retryStrategy =
704 std::make_shared<WrappedRetryStrategy>(options_.retry_strategy);
705 } else {
706 client_config_.retryStrategy = std::make_shared<ConnectRetryStrategy>();
707 }
708 if (!internal::global_options.tls_ca_file_path.empty()) {
709 client_config_.caFile = ToAwsString(internal::global_options.tls_ca_file_path);
710 }
711 if (!internal::global_options.tls_ca_dir_path.empty()) {
712 client_config_.caPath = ToAwsString(internal::global_options.tls_ca_dir_path);
713 }
714
715 const bool use_virtual_addressing = options_.endpoint_override.empty();
716
717 // Set proxy options if provided
718 if (!options_.proxy_options.scheme.empty()) {
719 if (options_.proxy_options.scheme == "http") {
720 client_config_.proxyScheme = Aws::Http::Scheme::HTTP;
721 } else if (options_.proxy_options.scheme == "https") {
722 client_config_.proxyScheme = Aws::Http::Scheme::HTTPS;
723 } else {
724 return Status::Invalid("Invalid proxy connection scheme '",
725 options_.proxy_options.scheme, "'");
726 }
727 }
728 if (!options_.proxy_options.host.empty()) {
729 client_config_.proxyHost = ToAwsString(options_.proxy_options.host);
730 }
731 if (options_.proxy_options.port != -1) {
732 client_config_.proxyPort = options_.proxy_options.port;
733 }
734 if (!options_.proxy_options.username.empty()) {
735 client_config_.proxyUserName = ToAwsString(options_.proxy_options.username);
736 }
737 if (!options_.proxy_options.password.empty()) {
738 client_config_.proxyPassword = ToAwsString(options_.proxy_options.password);
739 }
740
741 auto client = std::make_shared<S3Client>(
742 credentials_provider_, client_config_,
743 Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
744 use_virtual_addressing);
745 client->s3_retry_strategy_ = options_.retry_strategy;
746 return client;
747 }
748
options() const749 const S3Options& options() const { return options_; }
750
751 protected:
752 S3Options options_;
753 Aws::Client::ClientConfiguration client_config_;
754 std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider_;
755 };
756
757 // -----------------------------------------------------------------------
758 // S3 region resolver
759
760 class RegionResolver {
761 public:
Make(S3Options options)762 static Result<std::shared_ptr<RegionResolver>> Make(S3Options options) {
763 std::shared_ptr<RegionResolver> resolver(new RegionResolver(std::move(options)));
764 RETURN_NOT_OK(resolver->Init());
765 return resolver;
766 }
767
DefaultInstance()768 static Result<std::shared_ptr<RegionResolver>> DefaultInstance() {
769 static std::shared_ptr<RegionResolver> instance;
770 auto resolver = arrow::internal::atomic_load(&instance);
771 if (resolver) {
772 return resolver;
773 }
774 auto maybe_resolver = Make(S3Options::Anonymous());
775 if (!maybe_resolver.ok()) {
776 return maybe_resolver;
777 }
778 // Make sure to always return the same instance even if several threads
779 // call DefaultInstance at once.
780 std::shared_ptr<RegionResolver> existing;
781 if (arrow::internal::atomic_compare_exchange_strong(&instance, &existing,
782 *maybe_resolver)) {
783 return *maybe_resolver;
784 } else {
785 return existing;
786 }
787 }
788
ResolveRegion(const std::string & bucket)789 Result<std::string> ResolveRegion(const std::string& bucket) {
790 std::unique_lock<std::mutex> lock(cache_mutex_);
791 auto it = cache_.find(bucket);
792 if (it != cache_.end()) {
793 return it->second;
794 }
795 lock.unlock();
796 ARROW_ASSIGN_OR_RAISE(auto region, ResolveRegionUncached(bucket));
797 lock.lock();
798 // Note we don't cache a non-existent bucket, as the bucket could be created later
799 cache_[bucket] = region;
800 return region;
801 }
802
ResolveRegionUncached(const std::string & bucket)803 Result<std::string> ResolveRegionUncached(const std::string& bucket) {
804 return client_->GetBucketRegion(bucket);
805 }
806
807 protected:
RegionResolver(S3Options options)808 explicit RegionResolver(S3Options options) : builder_(std::move(options)) {}
809
Init()810 Status Init() {
811 DCHECK(builder_.options().endpoint_override.empty());
812 // On Windows with AWS SDK >= 1.8, it is necessary to disable redirects (ARROW-10085).
813 DisableRedirects(builder_.mutable_config());
814 return builder_.BuildClient().Value(&client_);
815 }
816
817 ClientBuilder builder_;
818 std::shared_ptr<S3Client> client_;
819
820 std::mutex cache_mutex_;
821 // XXX Should cache size be bounded? It must be quite unusual to query millions
822 // of different buckets in a single program invocation...
823 std::unordered_map<std::string, std::string> cache_;
824 };
825
826 // -----------------------------------------------------------------------
827 // S3 file stream implementations
828
829 // A non-copying iostream.
830 // See https://stackoverflow.com/questions/35322033/aws-c-sdk-uploadpart-times-out
831 // https://stackoverflow.com/questions/13059091/creating-an-input-stream-from-constant-memory
832 class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream {
833 public:
StringViewStream(const void * data,int64_t nbytes)834 StringViewStream(const void* data, int64_t nbytes)
835 : Aws::Utils::Stream::PreallocatedStreamBuf(
836 reinterpret_cast<unsigned char*>(const_cast<void*>(data)),
837 static_cast<size_t>(nbytes)),
838 std::iostream(this) {}
839 };
840
841 // By default, the AWS SDK reads object data into an auto-growing StringStream.
842 // To avoid copies, read directly into our preallocated buffer instead.
843 // See https://github.com/aws/aws-sdk-cpp/issues/64 for an alternative but
844 // functionally similar recipe.
AwsWriteableStreamFactory(void * data,int64_t nbytes)845 Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) {
846 return [=]() { return Aws::New<StringViewStream>("", data, nbytes); };
847 }
848
GetObjectRange(Aws::S3::S3Client * client,const S3Path & path,int64_t start,int64_t length,void * out)849 Result<S3Model::GetObjectResult> GetObjectRange(Aws::S3::S3Client* client,
850 const S3Path& path, int64_t start,
851 int64_t length, void* out) {
852 S3Model::GetObjectRequest req;
853 req.SetBucket(ToAwsString(path.bucket));
854 req.SetKey(ToAwsString(path.key));
855 req.SetRange(ToAwsString(FormatRange(start, length)));
856 req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length));
857 return OutcomeToResult(client->GetObject(req));
858 }
859
860 template <typename ObjectResult>
GetObjectMetadata(const ObjectResult & result)861 std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& result) {
862 auto md = std::make_shared<KeyValueMetadata>();
863
864 auto push = [&](std::string k, const Aws::String& v) {
865 if (!v.empty()) {
866 md->Append(std::move(k), FromAwsString(v).to_string());
867 }
868 };
869 auto push_datetime = [&](std::string k, const Aws::Utils::DateTime& v) {
870 if (v != Aws::Utils::DateTime(0.0)) {
871 push(std::move(k), v.ToGmtString(Aws::Utils::DateFormat::ISO_8601));
872 }
873 };
874
875 md->Append("Content-Length", std::to_string(result.GetContentLength()));
876 push("Cache-Control", result.GetCacheControl());
877 push("Content-Type", result.GetContentType());
878 push("Content-Language", result.GetContentLanguage());
879 push("ETag", result.GetETag());
880 push("VersionId", result.GetVersionId());
881 push_datetime("Last-Modified", result.GetLastModified());
882 push_datetime("Expires", result.GetExpires());
883 // NOTE the "canned ACL" isn't available for reading (one can get an expanded
884 // ACL using a separate GetObjectAcl request)
885 return md;
886 }
887
888 template <typename ObjectRequest>
889 struct ObjectMetadataSetter {
890 using Setter = std::function<Status(const std::string& value, ObjectRequest* req)>;
891
GetSettersarrow::fs::__anon5c130cae0311::ObjectMetadataSetter892 static std::unordered_map<std::string, Setter> GetSetters() {
893 return {{"ACL", CannedACLSetter()},
894 {"Cache-Control", StringSetter(&ObjectRequest::SetCacheControl)},
895 {"Content-Type", StringSetter(&ObjectRequest::SetContentType)},
896 {"Content-Language", StringSetter(&ObjectRequest::SetContentLanguage)},
897 {"Expires", DateTimeSetter(&ObjectRequest::SetExpires)}};
898 }
899
900 private:
StringSetterarrow::fs::__anon5c130cae0311::ObjectMetadataSetter901 static Setter StringSetter(void (ObjectRequest::*req_method)(Aws::String&&)) {
902 return [req_method](const std::string& v, ObjectRequest* req) {
903 (req->*req_method)(ToAwsString(v));
904 return Status::OK();
905 };
906 }
907
DateTimeSetterarrow::fs::__anon5c130cae0311::ObjectMetadataSetter908 static Setter DateTimeSetter(
909 void (ObjectRequest::*req_method)(Aws::Utils::DateTime&&)) {
910 return [req_method](const std::string& v, ObjectRequest* req) {
911 (req->*req_method)(
912 Aws::Utils::DateTime(v.data(), Aws::Utils::DateFormat::ISO_8601));
913 return Status::OK();
914 };
915 }
916
CannedACLSetterarrow::fs::__anon5c130cae0311::ObjectMetadataSetter917 static Setter CannedACLSetter() {
918 return [](const std::string& v, ObjectRequest* req) {
919 ARROW_ASSIGN_OR_RAISE(auto acl, ParseACL(v));
920 req->SetACL(acl);
921 return Status::OK();
922 };
923 }
924
ParseACLarrow::fs::__anon5c130cae0311::ObjectMetadataSetter925 static Result<S3Model::ObjectCannedACL> ParseACL(const std::string& v) {
926 if (v.empty()) {
927 return S3Model::ObjectCannedACL::NOT_SET;
928 }
929 auto acl = S3Model::ObjectCannedACLMapper::GetObjectCannedACLForName(ToAwsString(v));
930 if (acl == S3Model::ObjectCannedACL::NOT_SET) {
931 // XXX This actually never happens, as the AWS SDK dynamically
932 // expands the enum range using Aws::GetEnumOverflowContainer()
933 return Status::Invalid("Invalid S3 canned ACL: '", v, "'");
934 }
935 return acl;
936 }
937 };
938
939 template <typename ObjectRequest>
SetObjectMetadata(const std::shared_ptr<const KeyValueMetadata> & metadata,ObjectRequest * req)940 Status SetObjectMetadata(const std::shared_ptr<const KeyValueMetadata>& metadata,
941 ObjectRequest* req) {
942 static auto setters = ObjectMetadataSetter<ObjectRequest>::GetSetters();
943
944 DCHECK_NE(metadata, nullptr);
945 const auto& keys = metadata->keys();
946 const auto& values = metadata->values();
947
948 for (size_t i = 0; i < keys.size(); ++i) {
949 auto it = setters.find(keys[i]);
950 if (it != setters.end()) {
951 RETURN_NOT_OK(it->second(values[i], req));
952 }
953 }
954 return Status::OK();
955 }
956
957 // A RandomAccessFile that reads from a S3 object
958 class ObjectInputFile final : public io::RandomAccessFile {
959 public:
ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,const io::IOContext & io_context,const S3Path & path,int64_t size=kNoSize)960 ObjectInputFile(std::shared_ptr<Aws::S3::S3Client> client,
961 const io::IOContext& io_context, const S3Path& path,
962 int64_t size = kNoSize)
963 : client_(std::move(client)),
964 io_context_(io_context),
965 path_(path),
966 content_length_(size) {}
967
Init()968 Status Init() {
969 // Issue a HEAD Object to get the content-length and ensure any
970 // errors (e.g. file not found) don't wait until the first Read() call.
971 if (content_length_ != kNoSize) {
972 DCHECK_GE(content_length_, 0);
973 return Status::OK();
974 }
975
976 S3Model::HeadObjectRequest req;
977 req.SetBucket(ToAwsString(path_.bucket));
978 req.SetKey(ToAwsString(path_.key));
979
980 auto outcome = client_->HeadObject(req);
981 if (!outcome.IsSuccess()) {
982 if (IsNotFound(outcome.GetError())) {
983 return PathNotFound(path_);
984 } else {
985 return ErrorToStatus(
986 std::forward_as_tuple("When reading information for key '", path_.key,
987 "' in bucket '", path_.bucket, "': "),
988 outcome.GetError());
989 }
990 }
991 content_length_ = outcome.GetResult().GetContentLength();
992 DCHECK_GE(content_length_, 0);
993 metadata_ = GetObjectMetadata(outcome.GetResult());
994 return Status::OK();
995 }
996
CheckClosed() const997 Status CheckClosed() const {
998 if (closed_) {
999 return Status::Invalid("Operation on closed stream");
1000 }
1001 return Status::OK();
1002 }
1003
CheckPosition(int64_t position,const char * action) const1004 Status CheckPosition(int64_t position, const char* action) const {
1005 if (position < 0) {
1006 return Status::Invalid("Cannot ", action, " from negative position");
1007 }
1008 if (position > content_length_) {
1009 return Status::IOError("Cannot ", action, " past end of file");
1010 }
1011 return Status::OK();
1012 }
1013
1014 // RandomAccessFile APIs
1015
ReadMetadata()1016 Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
1017 return metadata_;
1018 }
1019
ReadMetadataAsync(const io::IOContext & io_context)1020 Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
1021 const io::IOContext& io_context) override {
1022 return metadata_;
1023 }
1024
Close()1025 Status Close() override {
1026 client_ = nullptr;
1027 closed_ = true;
1028 return Status::OK();
1029 }
1030
closed() const1031 bool closed() const override { return closed_; }
1032
Tell() const1033 Result<int64_t> Tell() const override {
1034 RETURN_NOT_OK(CheckClosed());
1035 return pos_;
1036 }
1037
GetSize()1038 Result<int64_t> GetSize() override {
1039 RETURN_NOT_OK(CheckClosed());
1040 return content_length_;
1041 }
1042
Seek(int64_t position)1043 Status Seek(int64_t position) override {
1044 RETURN_NOT_OK(CheckClosed());
1045 RETURN_NOT_OK(CheckPosition(position, "seek"));
1046
1047 pos_ = position;
1048 return Status::OK();
1049 }
1050
ReadAt(int64_t position,int64_t nbytes,void * out)1051 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
1052 RETURN_NOT_OK(CheckClosed());
1053 RETURN_NOT_OK(CheckPosition(position, "read"));
1054
1055 nbytes = std::min(nbytes, content_length_ - position);
1056 if (nbytes == 0) {
1057 return 0;
1058 }
1059
1060 // Read the desired range of bytes
1061 ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
1062 GetObjectRange(client_.get(), path_, position, nbytes, out));
1063
1064 auto& stream = result.GetBody();
1065 stream.ignore(nbytes);
1066 // NOTE: the stream is a stringstream by default, there is no actual error
1067 // to check for. However, stream.fail() may return true if EOF is reached.
1068 return stream.gcount();
1069 }
1070
ReadAt(int64_t position,int64_t nbytes)1071 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
1072 RETURN_NOT_OK(CheckClosed());
1073 RETURN_NOT_OK(CheckPosition(position, "read"));
1074
1075 // No need to allocate more than the remaining number of bytes
1076 nbytes = std::min(nbytes, content_length_ - position);
1077
1078 ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, io_context_.pool()));
1079 if (nbytes > 0) {
1080 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
1081 ReadAt(position, nbytes, buf->mutable_data()));
1082 DCHECK_LE(bytes_read, nbytes);
1083 RETURN_NOT_OK(buf->Resize(bytes_read));
1084 }
1085 return std::move(buf);
1086 }
1087
Read(int64_t nbytes,void * out)1088 Result<int64_t> Read(int64_t nbytes, void* out) override {
1089 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
1090 pos_ += bytes_read;
1091 return bytes_read;
1092 }
1093
Read(int64_t nbytes)1094 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
1095 ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
1096 pos_ += buffer->size();
1097 return std::move(buffer);
1098 }
1099
1100 protected:
1101 std::shared_ptr<Aws::S3::S3Client> client_;
1102 const io::IOContext io_context_;
1103 S3Path path_;
1104
1105 bool closed_ = false;
1106 int64_t pos_ = 0;
1107 int64_t content_length_ = kNoSize;
1108 std::shared_ptr<const KeyValueMetadata> metadata_;
1109 };
1110
1111 // Minimum size for each part of a multipart upload, except for the last part.
1112 // AWS doc says "5 MB" but it's not clear whether those are MB or MiB,
1113 // so I chose the safer value.
1114 // (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
1115 static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
1116
1117 // An OutputStream that writes to a S3 object
1118 class ObjectOutputStream final : public io::OutputStream {
1119 protected:
1120 struct UploadState;
1121
1122 public:
ObjectOutputStream(std::shared_ptr<S3Client> client,const io::IOContext & io_context,const S3Path & path,const S3Options & options,const std::shared_ptr<const KeyValueMetadata> & metadata)1123 ObjectOutputStream(std::shared_ptr<S3Client> client, const io::IOContext& io_context,
1124 const S3Path& path, const S3Options& options,
1125 const std::shared_ptr<const KeyValueMetadata>& metadata)
1126 : client_(std::move(client)),
1127 io_context_(io_context),
1128 path_(path),
1129 metadata_(metadata),
1130 default_metadata_(options.default_metadata),
1131 background_writes_(options.background_writes) {}
1132
~ObjectOutputStream()1133 ~ObjectOutputStream() override {
1134 // For compliance with the rest of the IO stack, Close rather than Abort,
1135 // even though it may be more expensive.
1136 io::internal::CloseFromDestructor(this);
1137 }
1138
Init()1139 Status Init() {
1140 // Initiate the multi-part upload
1141 S3Model::CreateMultipartUploadRequest req;
1142 req.SetBucket(ToAwsString(path_.bucket));
1143 req.SetKey(ToAwsString(path_.key));
1144 if (metadata_ && metadata_->size() != 0) {
1145 RETURN_NOT_OK(SetObjectMetadata(metadata_, &req));
1146 } else if (default_metadata_ && default_metadata_->size() != 0) {
1147 RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req));
1148 }
1149
1150 auto outcome = client_->CreateMultipartUpload(req);
1151 if (!outcome.IsSuccess()) {
1152 return ErrorToStatus(
1153 std::forward_as_tuple("When initiating multiple part upload for key '",
1154 path_.key, "' in bucket '", path_.bucket, "': "),
1155 outcome.GetError());
1156 }
1157 upload_id_ = outcome.GetResult().GetUploadId();
1158 upload_state_ = std::make_shared<UploadState>();
1159 closed_ = false;
1160 return Status::OK();
1161 }
1162
Abort()1163 Status Abort() override {
1164 if (closed_) {
1165 return Status::OK();
1166 }
1167
1168 S3Model::AbortMultipartUploadRequest req;
1169 req.SetBucket(ToAwsString(path_.bucket));
1170 req.SetKey(ToAwsString(path_.key));
1171 req.SetUploadId(upload_id_);
1172
1173 auto outcome = client_->AbortMultipartUpload(req);
1174 if (!outcome.IsSuccess()) {
1175 return ErrorToStatus(
1176 std::forward_as_tuple("When aborting multiple part upload for key '", path_.key,
1177 "' in bucket '", path_.bucket, "': "),
1178 outcome.GetError());
1179 }
1180 current_part_.reset();
1181 client_ = nullptr;
1182 closed_ = true;
1183 return Status::OK();
1184 }
1185
1186 // OutputStream interface
1187
Close()1188 Status Close() override {
1189 if (closed_) {
1190 return Status::OK();
1191 }
1192
1193 if (current_part_) {
1194 // Upload last part
1195 RETURN_NOT_OK(CommitCurrentPart());
1196 }
1197
1198 // S3 mandates at least one part, upload an empty one if necessary
1199 if (part_number_ == 1) {
1200 RETURN_NOT_OK(UploadPart("", 0));
1201 }
1202
1203 // Wait for in-progress uploads to finish (if async writes are enabled)
1204 RETURN_NOT_OK(Flush());
1205
1206 // At this point, all part uploads have finished successfully
1207 DCHECK_GT(part_number_, 1);
1208 DCHECK_EQ(upload_state_->completed_parts.size(),
1209 static_cast<size_t>(part_number_ - 1));
1210
1211 S3Model::CompletedMultipartUpload completed_upload;
1212 completed_upload.SetParts(upload_state_->completed_parts);
1213 S3Model::CompleteMultipartUploadRequest req;
1214 req.SetBucket(ToAwsString(path_.bucket));
1215 req.SetKey(ToAwsString(path_.key));
1216 req.SetUploadId(upload_id_);
1217 req.SetMultipartUpload(std::move(completed_upload));
1218
1219 auto outcome = client_->CompleteMultipartUploadWithErrorFixup(std::move(req));
1220 if (!outcome.IsSuccess()) {
1221 return ErrorToStatus(
1222 std::forward_as_tuple("When completing multiple part upload for key '",
1223 path_.key, "' in bucket '", path_.bucket, "': "),
1224 outcome.GetError());
1225 }
1226
1227 client_ = nullptr;
1228 closed_ = true;
1229 return Status::OK();
1230 }
1231
closed() const1232 bool closed() const override { return closed_; }
1233
Tell() const1234 Result<int64_t> Tell() const override {
1235 if (closed_) {
1236 return Status::Invalid("Operation on closed stream");
1237 }
1238 return pos_;
1239 }
1240
Write(const std::shared_ptr<Buffer> & buffer)1241 Status Write(const std::shared_ptr<Buffer>& buffer) override {
1242 return DoWrite(buffer->data(), buffer->size(), buffer);
1243 }
1244
Write(const void * data,int64_t nbytes)1245 Status Write(const void* data, int64_t nbytes) override {
1246 return DoWrite(data, nbytes);
1247 }
1248
DoWrite(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)1249 Status DoWrite(const void* data, int64_t nbytes,
1250 std::shared_ptr<Buffer> owned_buffer = nullptr) {
1251 if (closed_) {
1252 return Status::Invalid("Operation on closed stream");
1253 }
1254
1255 if (!current_part_ && nbytes >= part_upload_threshold_) {
1256 // No current part and data large enough, upload it directly
1257 // (without copying if the buffer is owned)
1258 RETURN_NOT_OK(UploadPart(data, nbytes, owned_buffer));
1259 pos_ += nbytes;
1260 return Status::OK();
1261 }
1262 // Can't upload data on its own, need to buffer it
1263 if (!current_part_) {
1264 ARROW_ASSIGN_OR_RAISE(
1265 current_part_,
1266 io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool()));
1267 current_part_size_ = 0;
1268 }
1269 RETURN_NOT_OK(current_part_->Write(data, nbytes));
1270 pos_ += nbytes;
1271 current_part_size_ += nbytes;
1272
1273 if (current_part_size_ >= part_upload_threshold_) {
1274 // Current part large enough, upload it
1275 RETURN_NOT_OK(CommitCurrentPart());
1276 }
1277
1278 return Status::OK();
1279 }
1280
Flush()1281 Status Flush() override {
1282 if (closed_) {
1283 return Status::Invalid("Operation on closed stream");
1284 }
1285 // Wait for background writes to finish
1286 std::unique_lock<std::mutex> lock(upload_state_->mutex);
1287 upload_state_->cv.wait(lock,
1288 [this]() { return upload_state_->parts_in_progress == 0; });
1289 return upload_state_->status;
1290 }
1291
1292 // Upload-related helpers
1293
CommitCurrentPart()1294 Status CommitCurrentPart() {
1295 ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
1296 current_part_.reset();
1297 current_part_size_ = 0;
1298 return UploadPart(buf);
1299 }
1300
UploadPart(std::shared_ptr<Buffer> buffer)1301 Status UploadPart(std::shared_ptr<Buffer> buffer) {
1302 return UploadPart(buffer->data(), buffer->size(), buffer);
1303 }
1304
UploadPart(const void * data,int64_t nbytes,std::shared_ptr<Buffer> owned_buffer=nullptr)1305 Status UploadPart(const void* data, int64_t nbytes,
1306 std::shared_ptr<Buffer> owned_buffer = nullptr) {
1307 S3Model::UploadPartRequest req;
1308 req.SetBucket(ToAwsString(path_.bucket));
1309 req.SetKey(ToAwsString(path_.key));
1310 req.SetUploadId(upload_id_);
1311 req.SetPartNumber(part_number_);
1312 req.SetContentLength(nbytes);
1313
1314 if (!background_writes_) {
1315 req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
1316 auto outcome = client_->UploadPart(req);
1317 if (!outcome.IsSuccess()) {
1318 return UploadPartError(req, outcome);
1319 } else {
1320 AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
1321 }
1322 } else {
1323 // If the data isn't owned, make an immutable copy for the lifetime of the closure
1324 if (owned_buffer == nullptr) {
1325 ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
1326 memcpy(owned_buffer->mutable_data(), data, nbytes);
1327 } else {
1328 DCHECK_EQ(data, owned_buffer->data());
1329 DCHECK_EQ(nbytes, owned_buffer->size());
1330 }
1331 req.SetBody(
1332 std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
1333
1334 {
1335 std::unique_lock<std::mutex> lock(upload_state_->mutex);
1336 ++upload_state_->parts_in_progress;
1337 }
1338 auto client = client_;
1339 ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
1340 return client->UploadPart(req);
1341 }));
1342 // The closure keeps the buffer and the upload state alive
1343 auto state = upload_state_;
1344 auto part_number = part_number_;
1345 auto handler = [owned_buffer, state, part_number,
1346 req](const Result<S3Model::UploadPartOutcome>& result) -> void {
1347 HandleUploadOutcome(state, part_number, req, result);
1348 };
1349 fut.AddCallback(std::move(handler));
1350 }
1351
1352 ++part_number_;
1353 // With up to 10000 parts in an upload (S3 limit), a stream writing chunks
1354 // of exactly 5MB would be limited to 50GB total. To avoid that, we bump
1355 // the upload threshold every 100 parts. So the pattern is:
1356 // - part 1 to 99: 5MB threshold
1357 // - part 100 to 199: 10MB threshold
1358 // - part 200 to 299: 15MB threshold
1359 // ...
1360 // - part 9900 to 9999: 500MB threshold
1361 // So the total size limit is 2475000MB or ~2.4TB, while keeping manageable
1362 // chunk sizes and avoiding too much buffering in the common case of a small-ish
1363 // stream. If the limit's not enough, we can revisit.
1364 if (part_number_ % 100 == 0) {
1365 part_upload_threshold_ += kMinimumPartUpload;
1366 }
1367
1368 return Status::OK();
1369 }
1370
HandleUploadOutcome(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartRequest & req,const Result<S3Model::UploadPartOutcome> & result)1371 static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
1372 int part_number, const S3Model::UploadPartRequest& req,
1373 const Result<S3Model::UploadPartOutcome>& result) {
1374 std::unique_lock<std::mutex> lock(state->mutex);
1375 if (!result.ok()) {
1376 state->status &= result.status();
1377 } else {
1378 const auto& outcome = *result;
1379 if (!outcome.IsSuccess()) {
1380 state->status &= UploadPartError(req, outcome);
1381 } else {
1382 AddCompletedPart(state, part_number, outcome.GetResult());
1383 }
1384 }
1385 // Notify completion
1386 if (--state->parts_in_progress == 0) {
1387 state->cv.notify_all();
1388 }
1389 }
1390
AddCompletedPart(const std::shared_ptr<UploadState> & state,int part_number,const S3Model::UploadPartResult & result)1391 static void AddCompletedPart(const std::shared_ptr<UploadState>& state, int part_number,
1392 const S3Model::UploadPartResult& result) {
1393 S3Model::CompletedPart part;
1394 // Append ETag and part number for this uploaded part
1395 // (will be needed for upload completion in Close())
1396 part.SetPartNumber(part_number);
1397 part.SetETag(result.GetETag());
1398 int slot = part_number - 1;
1399 if (state->completed_parts.size() <= static_cast<size_t>(slot)) {
1400 state->completed_parts.resize(slot + 1);
1401 }
1402 DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet());
1403 state->completed_parts[slot] = std::move(part);
1404 }
1405
UploadPartError(const S3Model::UploadPartRequest & req,const S3Model::UploadPartOutcome & outcome)1406 static Status UploadPartError(const S3Model::UploadPartRequest& req,
1407 const S3Model::UploadPartOutcome& outcome) {
1408 return ErrorToStatus(
1409 std::forward_as_tuple("When uploading part for key '", req.GetKey(),
1410 "' in bucket '", req.GetBucket(), "': "),
1411 outcome.GetError());
1412 }
1413
1414 protected:
1415 std::shared_ptr<S3Client> client_;
1416 const io::IOContext io_context_;
1417 const S3Path path_;
1418 const std::shared_ptr<const KeyValueMetadata> metadata_;
1419 const std::shared_ptr<const KeyValueMetadata> default_metadata_;
1420 const bool background_writes_;
1421
1422 Aws::String upload_id_;
1423 bool closed_ = true;
1424 int64_t pos_ = 0;
1425 int32_t part_number_ = 1;
1426 std::shared_ptr<io::BufferOutputStream> current_part_;
1427 int64_t current_part_size_ = 0;
1428 int64_t part_upload_threshold_ = kMinimumPartUpload;
1429
1430 // This struct is kept alive through background writes to avoid problems
1431 // in the completion handler.
1432 struct UploadState {
1433 std::mutex mutex;
1434 std::condition_variable cv;
1435 Aws::Vector<S3Model::CompletedPart> completed_parts;
1436 int64_t parts_in_progress = 0;
1437 Status status;
1438 };
1439 std::shared_ptr<UploadState> upload_state_;
1440 };
1441
1442 // This function assumes info->path() is already set
FileObjectToInfo(const S3Model::HeadObjectResult & obj,FileInfo * info)1443 void FileObjectToInfo(const S3Model::HeadObjectResult& obj, FileInfo* info) {
1444 info->set_type(FileType::File);
1445 info->set_size(static_cast<int64_t>(obj.GetContentLength()));
1446 info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1447 }
1448
FileObjectToInfo(const S3Model::Object & obj,FileInfo * info)1449 void FileObjectToInfo(const S3Model::Object& obj, FileInfo* info) {
1450 info->set_type(FileType::File);
1451 info->set_size(static_cast<int64_t>(obj.GetSize()));
1452 info->set_mtime(FromAwsDatetime(obj.GetLastModified()));
1453 }
1454
1455 struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
1456 using ResultHandler = std::function<Status(const std::string& prefix,
1457 const S3Model::ListObjectsV2Result&)>;
1458 using ErrorHandler = std::function<Status(const AWSError<S3Errors>& error)>;
1459 using RecursionHandler = std::function<Result<bool>(int32_t nesting_depth)>;
1460
1461 std::shared_ptr<Aws::S3::S3Client> client_;
1462 io::IOContext io_context_;
1463 const std::string bucket_;
1464 const std::string base_dir_;
1465 const int32_t max_keys_;
1466 const ResultHandler result_handler_;
1467 const ErrorHandler error_handler_;
1468 const RecursionHandler recursion_handler_;
1469
1470 template <typename... Args>
Walkarrow::fs::__anon5c130cae0311::TreeWalker1471 static Status Walk(Args&&... args) {
1472 return WalkAsync(std::forward<Args>(args)...).status();
1473 }
1474
1475 template <typename... Args>
WalkAsyncarrow::fs::__anon5c130cae0311::TreeWalker1476 static Future<> WalkAsync(Args&&... args) {
1477 auto self = std::make_shared<TreeWalker>(std::forward<Args>(args)...);
1478 return self->DoWalk();
1479 }
1480
TreeWalkerarrow::fs::__anon5c130cae0311::TreeWalker1481 TreeWalker(std::shared_ptr<Aws::S3::S3Client> client, io::IOContext io_context,
1482 std::string bucket, std::string base_dir, int32_t max_keys,
1483 ResultHandler result_handler, ErrorHandler error_handler,
1484 RecursionHandler recursion_handler)
1485 : client_(std::move(client)),
1486 io_context_(io_context),
1487 bucket_(std::move(bucket)),
1488 base_dir_(std::move(base_dir)),
1489 max_keys_(max_keys),
1490 result_handler_(std::move(result_handler)),
1491 error_handler_(std::move(error_handler)),
1492 recursion_handler_(std::move(recursion_handler)) {}
1493
1494 private:
1495 std::shared_ptr<TaskGroup> task_group_;
1496 std::mutex mutex_;
1497
DoWalkarrow::fs::__anon5c130cae0311::TreeWalker1498 Future<> DoWalk() {
1499 task_group_ =
1500 TaskGroup::MakeThreaded(io_context_.executor(), io_context_.stop_token());
1501 WalkChild(base_dir_, /*nesting_depth=*/0);
1502 // When this returns, ListObjectsV2 tasks either have finished or will exit early
1503 return task_group_->FinishAsync();
1504 }
1505
okarrow::fs::__anon5c130cae0311::TreeWalker1506 bool ok() const { return task_group_->ok(); }
1507
1508 struct ListObjectsV2Handler {
1509 std::shared_ptr<TreeWalker> walker;
1510 std::string prefix;
1511 int32_t nesting_depth;
1512 S3Model::ListObjectsV2Request req;
1513
operator ()arrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1514 Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
1515 // Serialize calls to operation-specific handlers
1516 if (!walker->ok()) {
1517 // Early exit: avoid executing handlers if DoWalk() returned
1518 return Status::OK();
1519 }
1520 if (!result.ok()) {
1521 return result.status();
1522 }
1523 const auto& outcome = *result;
1524 if (!outcome.IsSuccess()) {
1525 {
1526 std::lock_guard<std::mutex> guard(walker->mutex_);
1527 return walker->error_handler_(outcome.GetError());
1528 }
1529 }
1530 return HandleResult(outcome.GetResult());
1531 }
1532
SpawnListObjectsV2arrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1533 void SpawnListObjectsV2() {
1534 auto cb = *this;
1535 walker->task_group_->Append([cb]() mutable {
1536 Result<S3Model::ListObjectsV2Outcome> result =
1537 cb.walker->client_->ListObjectsV2(cb.req);
1538 return cb(result);
1539 });
1540 }
1541
HandleResultarrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1542 Status HandleResult(const S3Model::ListObjectsV2Result& result) {
1543 bool recurse;
1544 {
1545 // Only one thread should be running result_handler_/recursion_handler_ at a time
1546 std::lock_guard<std::mutex> guard(walker->mutex_);
1547 recurse = result.GetCommonPrefixes().size() > 0;
1548 if (recurse) {
1549 ARROW_ASSIGN_OR_RAISE(auto maybe_recurse,
1550 walker->recursion_handler_(nesting_depth + 1));
1551 recurse &= maybe_recurse;
1552 }
1553 RETURN_NOT_OK(walker->result_handler_(prefix, result));
1554 }
1555 if (recurse) {
1556 walker->WalkChildren(result, nesting_depth + 1);
1557 }
1558 // If the result was truncated, issue a continuation request to get
1559 // further directory entries.
1560 if (result.GetIsTruncated()) {
1561 DCHECK(!result.GetNextContinuationToken().empty());
1562 req.SetContinuationToken(result.GetNextContinuationToken());
1563 SpawnListObjectsV2();
1564 }
1565 return Status::OK();
1566 }
1567
Startarrow::fs::__anon5c130cae0311::TreeWalker::ListObjectsV2Handler1568 void Start() {
1569 req.SetBucket(ToAwsString(walker->bucket_));
1570 if (!prefix.empty()) {
1571 req.SetPrefix(ToAwsString(prefix) + kSep);
1572 }
1573 req.SetDelimiter(Aws::String() + kSep);
1574 req.SetMaxKeys(walker->max_keys_);
1575 SpawnListObjectsV2();
1576 }
1577 };
1578
WalkChildarrow::fs::__anon5c130cae0311::TreeWalker1579 void WalkChild(std::string key, int32_t nesting_depth) {
1580 ListObjectsV2Handler handler{shared_from_this(), std::move(key), nesting_depth, {}};
1581 handler.Start();
1582 }
1583
WalkChildrenarrow::fs::__anon5c130cae0311::TreeWalker1584 void WalkChildren(const S3Model::ListObjectsV2Result& result, int32_t nesting_depth) {
1585 for (const auto& prefix : result.GetCommonPrefixes()) {
1586 const auto child_key =
1587 internal::RemoveTrailingSlash(FromAwsString(prefix.GetPrefix()));
1588 WalkChild(std::string{child_key}, nesting_depth);
1589 }
1590 }
1591
1592 friend struct ListObjectsV2Handler;
1593 };
1594
1595 } // namespace
1596
1597 // -----------------------------------------------------------------------
1598 // S3 filesystem implementation
1599
1600 class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Impl> {
1601 public:
1602 ClientBuilder builder_;
1603 io::IOContext io_context_;
1604 std::shared_ptr<S3Client> client_;
1605 util::optional<S3Backend> backend_;
1606
1607 const int32_t kListObjectsMaxKeys = 1000;
1608 // At most 1000 keys per multiple-delete request
1609 const int32_t kMultipleDeleteMaxKeys = 1000;
1610 // Limit recursing depth, since a recursion bomb can be created
1611 const int32_t kMaxNestingDepth = 100;
1612
Impl(S3Options options,io::IOContext io_context)1613 explicit Impl(S3Options options, io::IOContext io_context)
1614 : builder_(std::move(options)), io_context_(io_context) {}
1615
Init()1616 Status Init() { return builder_.BuildClient().Value(&client_); }
1617
options() const1618 const S3Options& options() const { return builder_.options(); }
1619
region() const1620 std::string region() const {
1621 return std::string(FromAwsString(builder_.config().region));
1622 }
1623
1624 template <typename Error>
SaveBackend(const Aws::Client::AWSError<Error> & error)1625 void SaveBackend(const Aws::Client::AWSError<Error>& error) {
1626 if (!backend_ || *backend_ == S3Backend::Other) {
1627 backend_ = DetectS3Backend(error);
1628 }
1629 }
1630
1631 // Tests to see if a bucket exists
BucketExists(const std::string & bucket)1632 Result<bool> BucketExists(const std::string& bucket) {
1633 S3Model::HeadBucketRequest req;
1634 req.SetBucket(ToAwsString(bucket));
1635
1636 auto outcome = client_->HeadBucket(req);
1637 if (!outcome.IsSuccess()) {
1638 if (!IsNotFound(outcome.GetError())) {
1639 return ErrorToStatus(std::forward_as_tuple(
1640 "When testing for existence of bucket '", bucket, "': "),
1641 outcome.GetError());
1642 }
1643 return false;
1644 }
1645 return true;
1646 }
1647
1648 // Create a bucket. Successful if bucket already exists.
CreateBucket(const std::string & bucket)1649 Status CreateBucket(const std::string& bucket) {
1650 S3Model::CreateBucketConfiguration config;
1651 S3Model::CreateBucketRequest req;
1652 auto _region = region();
1653 // AWS S3 treats the us-east-1 differently than other regions
1654 // https://docs.aws.amazon.com/cli/latest/reference/s3api/create-bucket.html
1655 if (_region != "us-east-1") {
1656 config.SetLocationConstraint(
1657 S3Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(
1658 ToAwsString(_region)));
1659 }
1660 req.SetBucket(ToAwsString(bucket));
1661 req.SetCreateBucketConfiguration(config);
1662
1663 auto outcome = client_->CreateBucket(req);
1664 if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) {
1665 return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "),
1666 outcome.GetError());
1667 }
1668 return Status::OK();
1669 }
1670
1671 // Create an object with empty contents. Successful if object already exists.
CreateEmptyObject(const std::string & bucket,const std::string & key)1672 Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
1673 S3Model::PutObjectRequest req;
1674 req.SetBucket(ToAwsString(bucket));
1675 req.SetKey(ToAwsString(key));
1676 return OutcomeToStatus(
1677 std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "),
1678 client_->PutObject(req));
1679 }
1680
CreateEmptyDir(const std::string & bucket,const std::string & key)1681 Status CreateEmptyDir(const std::string& bucket, const std::string& key) {
1682 DCHECK(!key.empty());
1683 return CreateEmptyObject(bucket, key + kSep);
1684 }
1685
DeleteObject(const std::string & bucket,const std::string & key)1686 Status DeleteObject(const std::string& bucket, const std::string& key) {
1687 S3Model::DeleteObjectRequest req;
1688 req.SetBucket(ToAwsString(bucket));
1689 req.SetKey(ToAwsString(key));
1690 return OutcomeToStatus(
1691 std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "),
1692 client_->DeleteObject(req));
1693 }
1694
CopyObject(const S3Path & src_path,const S3Path & dest_path)1695 Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
1696 S3Model::CopyObjectRequest req;
1697 req.SetBucket(ToAwsString(dest_path.bucket));
1698 req.SetKey(ToAwsString(dest_path.key));
1699 // ARROW-13048: Copy source "Must be URL-encoded" according to AWS SDK docs.
1700 // However at least in 1.8 and 1.9 the SDK URL-encodes the path for you
1701 req.SetCopySource(src_path.ToAwsString());
1702 return OutcomeToStatus(
1703 std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '",
1704 src_path.bucket, "' to key '", dest_path.key,
1705 "' in bucket '", dest_path.bucket, "': "),
1706 client_->CopyObject(req));
1707 }
1708
1709 // On Minio, an empty "directory" doesn't satisfy the same API requests as
1710 // a non-empty "directory". This is a Minio-specific quirk, but we need
1711 // to handle it for unit testing.
1712
IsEmptyDirectory(const std::string & bucket,const std::string & key,bool * out)1713 Status IsEmptyDirectory(const std::string& bucket, const std::string& key, bool* out) {
1714 S3Model::HeadObjectRequest req;
1715 req.SetBucket(ToAwsString(bucket));
1716 if (backend_ && *backend_ == S3Backend::Minio) {
1717 // Minio wants a slash at the end, Amazon doesn't
1718 req.SetKey(ToAwsString(key) + kSep);
1719 } else {
1720 req.SetKey(ToAwsString(key));
1721 }
1722
1723 auto outcome = client_->HeadObject(req);
1724 if (outcome.IsSuccess()) {
1725 *out = true;
1726 return Status::OK();
1727 }
1728 if (!backend_) {
1729 SaveBackend(outcome.GetError());
1730 DCHECK(backend_);
1731 if (*backend_ == S3Backend::Minio) {
1732 // Try again with separator-terminated key (see above)
1733 return IsEmptyDirectory(bucket, key, out);
1734 }
1735 }
1736 if (IsNotFound(outcome.GetError())) {
1737 *out = false;
1738 return Status::OK();
1739 }
1740 return ErrorToStatus(std::forward_as_tuple("When reading information for key '", key,
1741 "' in bucket '", bucket, "': "),
1742 outcome.GetError());
1743 }
1744
IsEmptyDirectory(const S3Path & path,bool * out)1745 Status IsEmptyDirectory(const S3Path& path, bool* out) {
1746 return IsEmptyDirectory(path.bucket, path.key, out);
1747 }
1748
IsNonEmptyDirectory(const S3Path & path,bool * out)1749 Status IsNonEmptyDirectory(const S3Path& path, bool* out) {
1750 S3Model::ListObjectsV2Request req;
1751 req.SetBucket(ToAwsString(path.bucket));
1752 req.SetPrefix(ToAwsString(path.key) + kSep);
1753 req.SetDelimiter(Aws::String() + kSep);
1754 req.SetMaxKeys(1);
1755 auto outcome = client_->ListObjectsV2(req);
1756 if (outcome.IsSuccess()) {
1757 *out = outcome.GetResult().GetKeyCount() > 0;
1758 return Status::OK();
1759 }
1760 if (IsNotFound(outcome.GetError())) {
1761 *out = false;
1762 return Status::OK();
1763 }
1764 return ErrorToStatus(
1765 std::forward_as_tuple("When listing objects under key '", path.key,
1766 "' in bucket '", path.bucket, "': "),
1767 outcome.GetError());
1768 }
1769
CheckNestingDepth(int32_t nesting_depth)1770 Status CheckNestingDepth(int32_t nesting_depth) {
1771 if (nesting_depth >= kMaxNestingDepth) {
1772 return Status::IOError("S3 filesystem tree exceeds maximum nesting depth (",
1773 kMaxNestingDepth, ")");
1774 }
1775 return Status::OK();
1776 }
1777
1778 // A helper class for Walk and WalkAsync
1779 struct FileInfoCollector {
FileInfoCollectorarrow::fs::S3FileSystem::Impl::FileInfoCollector1780 FileInfoCollector(std::string bucket, std::string key, const FileSelector& select)
1781 : bucket(std::move(bucket)),
1782 key(std::move(key)),
1783 allow_not_found(select.allow_not_found) {}
1784
Collectarrow::fs::S3FileSystem::Impl::FileInfoCollector1785 Status Collect(const std::string& prefix, const S3Model::ListObjectsV2Result& result,
1786 std::vector<FileInfo>* out) {
1787 // Walk "directories"
1788 for (const auto& child_prefix : result.GetCommonPrefixes()) {
1789 is_empty = false;
1790 const auto child_key =
1791 internal::RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix()));
1792 std::stringstream child_path;
1793 child_path << bucket << kSep << child_key;
1794 FileInfo info;
1795 info.set_path(child_path.str());
1796 info.set_type(FileType::Directory);
1797 out->push_back(std::move(info));
1798 }
1799 // Walk "files"
1800 for (const auto& obj : result.GetContents()) {
1801 is_empty = false;
1802 FileInfo info;
1803 const auto child_key = internal::RemoveTrailingSlash(FromAwsString(obj.GetKey()));
1804 if (child_key == util::string_view(prefix)) {
1805 // Amazon can return the "directory" key itself as part of the results, skip
1806 continue;
1807 }
1808 std::stringstream child_path;
1809 child_path << bucket << kSep << child_key;
1810 info.set_path(child_path.str());
1811 FileObjectToInfo(obj, &info);
1812 out->push_back(std::move(info));
1813 }
1814 return Status::OK();
1815 }
1816
Finisharrow::fs::S3FileSystem::Impl::FileInfoCollector1817 Status Finish(Impl* impl) {
1818 // If no contents were found, perhaps it's an empty "directory",
1819 // or perhaps it's a nonexistent entry. Check.
1820 if (is_empty && !allow_not_found) {
1821 bool is_actually_empty;
1822 RETURN_NOT_OK(impl->IsEmptyDirectory(bucket, key, &is_actually_empty));
1823 if (!is_actually_empty) {
1824 return PathNotFound(bucket, key);
1825 }
1826 }
1827 return Status::OK();
1828 }
1829
1830 std::string bucket;
1831 std::string key;
1832 bool allow_not_found;
1833 bool is_empty = true;
1834 };
1835
1836 // Workhorse for GetFileInfo(FileSelector...)
Walk(const FileSelector & select,const std::string & bucket,const std::string & key,std::vector<FileInfo> * out)1837 Status Walk(const FileSelector& select, const std::string& bucket,
1838 const std::string& key, std::vector<FileInfo>* out) {
1839 FileInfoCollector collector(bucket, key, select);
1840
1841 auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1842 if (select.allow_not_found && IsNotFound(error)) {
1843 return Status::OK();
1844 }
1845 return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1846 "' in bucket '", bucket, "': "),
1847 error);
1848 };
1849
1850 auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1851 RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1852 return select.recursive && nesting_depth <= select.max_recursion;
1853 };
1854
1855 auto handle_results = [&](const std::string& prefix,
1856 const S3Model::ListObjectsV2Result& result) -> Status {
1857 return collector.Collect(prefix, result, out);
1858 };
1859
1860 RETURN_NOT_OK(TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
1861 handle_results, handle_error, handle_recursion));
1862
1863 // If no contents were found, perhaps it's an empty "directory",
1864 // or perhaps it's a nonexistent entry. Check.
1865 RETURN_NOT_OK(collector.Finish(this));
1866 // Sort results for convenience, since they can come massively out of order
1867 std::sort(out->begin(), out->end(), FileInfo::ByPath{});
1868 return Status::OK();
1869 }
1870
1871 // Workhorse for GetFileInfoGenerator(FileSelector...)
WalkAsync(const FileSelector & select,const std::string & bucket,const std::string & key)1872 FileInfoGenerator WalkAsync(const FileSelector& select, const std::string& bucket,
1873 const std::string& key) {
1874 PushGenerator<std::vector<FileInfo>> gen;
1875 auto producer = gen.producer();
1876 auto collector = std::make_shared<FileInfoCollector>(bucket, key, select);
1877 auto self = shared_from_this();
1878
1879 auto handle_error = [select, bucket, key](const AWSError<S3Errors>& error) -> Status {
1880 if (select.allow_not_found && IsNotFound(error)) {
1881 return Status::OK();
1882 }
1883 return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1884 "' in bucket '", bucket, "': "),
1885 error);
1886 };
1887
1888 auto handle_recursion = [producer, select,
1889 self](int32_t nesting_depth) -> Result<bool> {
1890 if (producer.is_closed()) {
1891 return false;
1892 }
1893 RETURN_NOT_OK(self->CheckNestingDepth(nesting_depth));
1894 return select.recursive && nesting_depth <= select.max_recursion;
1895 };
1896
1897 auto handle_results =
1898 [collector, producer](
1899 const std::string& prefix,
1900 const S3Model::ListObjectsV2Result& result) mutable -> Status {
1901 std::vector<FileInfo> out;
1902 RETURN_NOT_OK(collector->Collect(prefix, result, &out));
1903 if (!out.empty()) {
1904 producer.Push(std::move(out));
1905 }
1906 return Status::OK();
1907 };
1908
1909 TreeWalker::WalkAsync(client_, io_context_, bucket, key, kListObjectsMaxKeys,
1910 handle_results, handle_error, handle_recursion)
1911 .AddCallback([collector, producer, self](const Status& status) mutable {
1912 auto st = collector->Finish(self.get());
1913 if (!st.ok()) {
1914 producer.Push(st);
1915 }
1916 producer.Close();
1917 });
1918 return gen;
1919 }
1920
WalkForDeleteDir(const std::string & bucket,const std::string & key,std::vector<std::string> * file_keys,std::vector<std::string> * dir_keys)1921 Status WalkForDeleteDir(const std::string& bucket, const std::string& key,
1922 std::vector<std::string>* file_keys,
1923 std::vector<std::string>* dir_keys) {
1924 auto handle_results = [&](const std::string& prefix,
1925 const S3Model::ListObjectsV2Result& result) -> Status {
1926 // Walk "files"
1927 file_keys->reserve(file_keys->size() + result.GetContents().size());
1928 for (const auto& obj : result.GetContents()) {
1929 file_keys->emplace_back(FromAwsString(obj.GetKey()));
1930 }
1931 // Walk "directories"
1932 dir_keys->reserve(dir_keys->size() + result.GetCommonPrefixes().size());
1933 for (const auto& prefix : result.GetCommonPrefixes()) {
1934 dir_keys->emplace_back(FromAwsString(prefix.GetPrefix()));
1935 }
1936 return Status::OK();
1937 };
1938
1939 auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
1940 return ErrorToStatus(std::forward_as_tuple("When listing objects under key '", key,
1941 "' in bucket '", bucket, "': "),
1942 error);
1943 };
1944
1945 auto handle_recursion = [&](int32_t nesting_depth) -> Result<bool> {
1946 RETURN_NOT_OK(CheckNestingDepth(nesting_depth));
1947 return true; // Recurse
1948 };
1949
1950 return TreeWalker::Walk(client_, io_context_, bucket, key, kListObjectsMaxKeys,
1951 handle_results, handle_error, handle_recursion);
1952 }
1953
1954 // Delete multiple objects at once
DeleteObjectsAsync(const std::string & bucket,const std::vector<std::string> & keys)1955 Future<> DeleteObjectsAsync(const std::string& bucket,
1956 const std::vector<std::string>& keys) {
1957 struct DeleteCallback {
1958 const std::string bucket;
1959
1960 Status operator()(const S3Model::DeleteObjectsOutcome& outcome) {
1961 if (!outcome.IsSuccess()) {
1962 return ErrorToStatus(outcome.GetError());
1963 }
1964 // Also need to check per-key errors, even on successful outcome
1965 // See
1966 // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html
1967 const auto& errors = outcome.GetResult().GetErrors();
1968 if (!errors.empty()) {
1969 std::stringstream ss;
1970 ss << "Got the following " << errors.size()
1971 << " errors when deleting objects in S3 bucket '" << bucket << "':\n";
1972 for (const auto& error : errors) {
1973 ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n";
1974 }
1975 return Status::IOError(ss.str());
1976 }
1977 return Status::OK();
1978 }
1979 };
1980
1981 const auto chunk_size = static_cast<size_t>(kMultipleDeleteMaxKeys);
1982 DeleteCallback delete_cb{bucket};
1983 auto client = client_;
1984
1985 std::vector<Future<>> futures;
1986 futures.reserve(keys.size() / chunk_size + 1);
1987
1988 for (size_t start = 0; start < keys.size(); start += chunk_size) {
1989 S3Model::DeleteObjectsRequest req;
1990 S3Model::Delete del;
1991 for (size_t i = start; i < std::min(keys.size(), chunk_size); ++i) {
1992 del.AddObjects(S3Model::ObjectIdentifier().WithKey(ToAwsString(keys[i])));
1993 }
1994 req.SetBucket(ToAwsString(bucket));
1995 req.SetDelete(std::move(del));
1996 ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
1997 return client->DeleteObjects(req);
1998 }));
1999 futures.push_back(std::move(fut).Then(delete_cb));
2000 }
2001
2002 return AllComplete(futures);
2003 }
2004
DeleteObjects(const std::string & bucket,const std::vector<std::string> & keys)2005 Status DeleteObjects(const std::string& bucket, const std::vector<std::string>& keys) {
2006 return DeleteObjectsAsync(bucket, keys).status();
2007 }
2008
DeleteDirContents(const std::string & bucket,const std::string & key)2009 Status DeleteDirContents(const std::string& bucket, const std::string& key) {
2010 std::vector<std::string> file_keys;
2011 std::vector<std::string> dir_keys;
2012 RETURN_NOT_OK(WalkForDeleteDir(bucket, key, &file_keys, &dir_keys));
2013 if (file_keys.empty() && dir_keys.empty() && !key.empty()) {
2014 // No contents found, is it an empty directory?
2015 bool exists = false;
2016 RETURN_NOT_OK(IsEmptyDirectory(bucket, key, &exists));
2017 if (!exists) {
2018 return PathNotFound(bucket, key);
2019 }
2020 }
2021 // First delete all "files", then delete all child "directories"
2022 RETURN_NOT_OK(DeleteObjects(bucket, file_keys));
2023 // Delete directories in reverse lexicographic order, to ensure children
2024 // are deleted before their parents (Minio).
2025 std::sort(dir_keys.rbegin(), dir_keys.rend());
2026 return DeleteObjects(bucket, dir_keys);
2027 }
2028
EnsureDirectoryExists(const S3Path & path)2029 Status EnsureDirectoryExists(const S3Path& path) {
2030 if (!path.key.empty()) {
2031 return CreateEmptyDir(path.bucket, path.key);
2032 }
2033 return Status::OK();
2034 }
2035
EnsureParentExists(const S3Path & path)2036 Status EnsureParentExists(const S3Path& path) {
2037 if (path.has_parent()) {
2038 return EnsureDirectoryExists(path.parent());
2039 }
2040 return Status::OK();
2041 }
2042
ProcessListBuckets(const Aws::S3::Model::ListBucketsOutcome & outcome)2043 static Result<std::vector<std::string>> ProcessListBuckets(
2044 const Aws::S3::Model::ListBucketsOutcome& outcome) {
2045 if (!outcome.IsSuccess()) {
2046 return ErrorToStatus(std::forward_as_tuple("When listing buckets: "),
2047 outcome.GetError());
2048 }
2049 std::vector<std::string> buckets;
2050 buckets.reserve(outcome.GetResult().GetBuckets().size());
2051 for (const auto& bucket : outcome.GetResult().GetBuckets()) {
2052 buckets.emplace_back(FromAwsString(bucket.GetName()));
2053 }
2054 return buckets;
2055 }
2056
ListBuckets()2057 Result<std::vector<std::string>> ListBuckets() {
2058 auto outcome = client_->ListBuckets();
2059 return ProcessListBuckets(outcome);
2060 }
2061
ListBucketsAsync(io::IOContext ctx)2062 Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
2063 auto self = shared_from_this();
2064 return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
2065 // TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
2066 .Then([](const Aws::S3::Model::ListBucketsOutcome& outcome) {
2067 return Impl::ProcessListBuckets(outcome);
2068 });
2069 }
2070
OpenInputFile(const std::string & s,S3FileSystem * fs)2071 Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
2072 S3FileSystem* fs) {
2073 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2074 RETURN_NOT_OK(ValidateFilePath(path));
2075
2076 auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path);
2077 RETURN_NOT_OK(ptr->Init());
2078 return ptr;
2079 }
2080
OpenInputFile(const FileInfo & info,S3FileSystem * fs)2081 Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
2082 S3FileSystem* fs) {
2083 if (info.type() == FileType::NotFound) {
2084 return ::arrow::fs::internal::PathNotFound(info.path());
2085 }
2086 if (info.type() != FileType::File && info.type() != FileType::Unknown) {
2087 return ::arrow::fs::internal::NotAFile(info.path());
2088 }
2089
2090 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
2091 RETURN_NOT_OK(ValidateFilePath(path));
2092
2093 auto ptr =
2094 std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size());
2095 RETURN_NOT_OK(ptr->Init());
2096 return ptr;
2097 }
2098 };
2099
S3FileSystem(const S3Options & options,const io::IOContext & io_context)2100 S3FileSystem::S3FileSystem(const S3Options& options, const io::IOContext& io_context)
2101 : FileSystem(io_context), impl_(std::make_shared<Impl>(options, io_context)) {
2102 default_async_is_sync_ = false;
2103 }
2104
~S3FileSystem()2105 S3FileSystem::~S3FileSystem() {}
2106
Make(const S3Options & options,const io::IOContext & io_context)2107 Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
2108 const S3Options& options, const io::IOContext& io_context) {
2109 RETURN_NOT_OK(CheckS3Initialized());
2110
2111 std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
2112 RETURN_NOT_OK(ptr->impl_->Init());
2113 return ptr;
2114 }
2115
Equals(const FileSystem & other) const2116 bool S3FileSystem::Equals(const FileSystem& other) const {
2117 if (this == &other) {
2118 return true;
2119 }
2120 if (other.type_name() != type_name()) {
2121 return false;
2122 }
2123 const auto& s3fs = ::arrow::internal::checked_cast<const S3FileSystem&>(other);
2124 return options().Equals(s3fs.options());
2125 }
2126
options() const2127 S3Options S3FileSystem::options() const { return impl_->options(); }
2128
region() const2129 std::string S3FileSystem::region() const { return impl_->region(); }
2130
GetFileInfo(const std::string & s)2131 Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
2132 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2133 FileInfo info;
2134 info.set_path(s);
2135
2136 if (path.empty()) {
2137 // It's the root path ""
2138 info.set_type(FileType::Directory);
2139 return info;
2140 } else if (path.key.empty()) {
2141 // It's a bucket
2142 S3Model::HeadBucketRequest req;
2143 req.SetBucket(ToAwsString(path.bucket));
2144
2145 auto outcome = impl_->client_->HeadBucket(req);
2146 if (!outcome.IsSuccess()) {
2147 if (!IsNotFound(outcome.GetError())) {
2148 return ErrorToStatus(
2149 std::forward_as_tuple("When getting information for bucket '", path.bucket,
2150 "': "),
2151 outcome.GetError());
2152 }
2153 info.set_type(FileType::NotFound);
2154 return info;
2155 }
2156 // NOTE: S3 doesn't have a bucket modification time. Only a creation
2157 // time is available, and you have to list all buckets to get it.
2158 info.set_type(FileType::Directory);
2159 return info;
2160 } else {
2161 // It's an object
2162 S3Model::HeadObjectRequest req;
2163 req.SetBucket(ToAwsString(path.bucket));
2164 req.SetKey(ToAwsString(path.key));
2165
2166 auto outcome = impl_->client_->HeadObject(req);
2167 if (outcome.IsSuccess()) {
2168 // "File" object found
2169 FileObjectToInfo(outcome.GetResult(), &info);
2170 return info;
2171 }
2172 if (!IsNotFound(outcome.GetError())) {
2173 return ErrorToStatus(
2174 std::forward_as_tuple("When getting information for key '", path.key,
2175 "' in bucket '", path.bucket, "': "),
2176 outcome.GetError());
2177 }
2178 // Not found => perhaps it's an empty "directory"
2179 bool is_dir = false;
2180 RETURN_NOT_OK(impl_->IsEmptyDirectory(path, &is_dir));
2181 if (is_dir) {
2182 info.set_type(FileType::Directory);
2183 return info;
2184 }
2185 // Not found => perhaps it's a non-empty "directory"
2186 RETURN_NOT_OK(impl_->IsNonEmptyDirectory(path, &is_dir));
2187 if (is_dir) {
2188 info.set_type(FileType::Directory);
2189 } else {
2190 info.set_type(FileType::NotFound);
2191 }
2192 return info;
2193 }
2194 }
2195
GetFileInfo(const FileSelector & select)2196 Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
2197 ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
2198
2199 FileInfoVector results;
2200
2201 if (base_path.empty()) {
2202 // List all buckets
2203 ARROW_ASSIGN_OR_RAISE(auto buckets, impl_->ListBuckets());
2204 for (const auto& bucket : buckets) {
2205 FileInfo info;
2206 info.set_path(bucket);
2207 info.set_type(FileType::Directory);
2208 results.push_back(std::move(info));
2209 if (select.recursive) {
2210 RETURN_NOT_OK(impl_->Walk(select, bucket, "", &results));
2211 }
2212 }
2213 return results;
2214 }
2215
2216 // Nominal case -> walk a single bucket
2217 RETURN_NOT_OK(impl_->Walk(select, base_path.bucket, base_path.key, &results));
2218 return results;
2219 }
2220
GetFileInfoGenerator(const FileSelector & select)2221 FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select) {
2222 auto maybe_base_path = S3Path::FromString(select.base_dir);
2223 if (!maybe_base_path.ok()) {
2224 return MakeFailingGenerator<FileInfoVector>(maybe_base_path.status());
2225 }
2226 auto base_path = *std::move(maybe_base_path);
2227
2228 if (base_path.empty()) {
2229 // List all buckets, then possibly recurse
2230 PushGenerator<AsyncGenerator<FileInfoVector>> gen;
2231 auto producer = gen.producer();
2232
2233 auto fut = impl_->ListBucketsAsync(io_context());
2234 auto impl = impl_->shared_from_this();
2235 fut.AddCallback(
2236 [producer, select, impl](const Result<std::vector<std::string>>& res) mutable {
2237 if (!res.ok()) {
2238 producer.Push(res.status());
2239 producer.Close();
2240 return;
2241 }
2242 FileInfoVector buckets;
2243 for (const auto& bucket : *res) {
2244 buckets.push_back(FileInfo{bucket, FileType::Directory});
2245 }
2246 // Generate all bucket infos
2247 auto buckets_fut = Future<FileInfoVector>::MakeFinished(std::move(buckets));
2248 producer.Push(MakeSingleFutureGenerator(buckets_fut));
2249 if (select.recursive) {
2250 // Generate recursive walk for each bucket in turn
2251 for (const auto& bucket : *buckets_fut.result()) {
2252 producer.Push(impl->WalkAsync(select, bucket.path(), ""));
2253 }
2254 }
2255 producer.Close();
2256 });
2257
2258 return MakeConcatenatedGenerator(
2259 AsyncGenerator<AsyncGenerator<FileInfoVector>>{std::move(gen)});
2260 }
2261
2262 // Nominal case -> walk a single bucket
2263 return impl_->WalkAsync(select, base_path.bucket, base_path.key);
2264 }
2265
CreateDir(const std::string & s,bool recursive)2266 Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
2267 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2268
2269 if (path.key.empty()) {
2270 // Create bucket
2271 return impl_->CreateBucket(path.bucket);
2272 }
2273
2274 // Create object
2275 if (recursive) {
2276 // Ensure bucket exists
2277 ARROW_ASSIGN_OR_RAISE(bool bucket_exists, impl_->BucketExists(path.bucket));
2278 if (!bucket_exists) {
2279 RETURN_NOT_OK(impl_->CreateBucket(path.bucket));
2280 }
2281 // Ensure that all parents exist, then the directory itself
2282 std::string parent_key;
2283 for (const auto& part : path.key_parts) {
2284 parent_key += part;
2285 parent_key += kSep;
2286 RETURN_NOT_OK(impl_->CreateEmptyObject(path.bucket, parent_key));
2287 }
2288 return Status::OK();
2289 } else {
2290 // Check parent dir exists
2291 if (path.has_parent()) {
2292 S3Path parent_path = path.parent();
2293 bool exists;
2294 RETURN_NOT_OK(impl_->IsNonEmptyDirectory(parent_path, &exists));
2295 if (!exists) {
2296 RETURN_NOT_OK(impl_->IsEmptyDirectory(parent_path, &exists));
2297 }
2298 if (!exists) {
2299 return Status::IOError("Cannot create directory '", path.full_path,
2300 "': parent directory does not exist");
2301 }
2302 }
2303
2304 // XXX Should we check that no non-directory entry exists?
2305 // Minio does it for us, not sure about other S3 implementations.
2306 return impl_->CreateEmptyDir(path.bucket, path.key);
2307 }
2308 }
2309
DeleteDir(const std::string & s)2310 Status S3FileSystem::DeleteDir(const std::string& s) {
2311 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2312
2313 if (path.empty()) {
2314 return Status::NotImplemented("Cannot delete all S3 buckets");
2315 }
2316 RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
2317 if (path.key.empty()) {
2318 // Delete bucket
2319 S3Model::DeleteBucketRequest req;
2320 req.SetBucket(ToAwsString(path.bucket));
2321 return OutcomeToStatus(
2322 std::forward_as_tuple("When deleting bucket '", path.bucket, "': "),
2323 impl_->client_->DeleteBucket(req));
2324 } else {
2325 // Delete "directory"
2326 RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key + kSep));
2327 // Parent may be implicitly deleted if it became empty, recreate it
2328 return impl_->EnsureParentExists(path);
2329 }
2330 }
2331
DeleteDirContents(const std::string & s)2332 Status S3FileSystem::DeleteDirContents(const std::string& s) {
2333 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2334
2335 if (path.empty()) {
2336 return Status::NotImplemented("Cannot delete all S3 buckets");
2337 }
2338 RETURN_NOT_OK(impl_->DeleteDirContents(path.bucket, path.key));
2339 // Directory may be implicitly deleted, recreate it
2340 return impl_->EnsureDirectoryExists(path);
2341 }
2342
DeleteRootDirContents()2343 Status S3FileSystem::DeleteRootDirContents() {
2344 return Status::NotImplemented("Cannot delete all S3 buckets");
2345 }
2346
DeleteFile(const std::string & s)2347 Status S3FileSystem::DeleteFile(const std::string& s) {
2348 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2349 RETURN_NOT_OK(ValidateFilePath(path));
2350
2351 // Check the object exists
2352 S3Model::HeadObjectRequest req;
2353 req.SetBucket(ToAwsString(path.bucket));
2354 req.SetKey(ToAwsString(path.key));
2355
2356 auto outcome = impl_->client_->HeadObject(req);
2357 if (!outcome.IsSuccess()) {
2358 if (IsNotFound(outcome.GetError())) {
2359 return PathNotFound(path);
2360 } else {
2361 return ErrorToStatus(
2362 std::forward_as_tuple("When getting information for key '", path.key,
2363 "' in bucket '", path.bucket, "': "),
2364 outcome.GetError());
2365 }
2366 }
2367 // Object found, delete it
2368 RETURN_NOT_OK(impl_->DeleteObject(path.bucket, path.key));
2369 // Parent may be implicitly deleted if it became empty, recreate it
2370 return impl_->EnsureParentExists(path);
2371 }
2372
Move(const std::string & src,const std::string & dest)2373 Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
2374 // XXX We don't implement moving directories as it would be too expensive:
2375 // one must copy all directory contents one by one (including object data),
2376 // then delete the original contents.
2377
2378 ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
2379 RETURN_NOT_OK(ValidateFilePath(src_path));
2380 ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
2381 RETURN_NOT_OK(ValidateFilePath(dest_path));
2382
2383 if (src_path == dest_path) {
2384 return Status::OK();
2385 }
2386 RETURN_NOT_OK(impl_->CopyObject(src_path, dest_path));
2387 RETURN_NOT_OK(impl_->DeleteObject(src_path.bucket, src_path.key));
2388 // Source parent may be implicitly deleted if it became empty, recreate it
2389 return impl_->EnsureParentExists(src_path);
2390 }
2391
CopyFile(const std::string & src,const std::string & dest)2392 Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
2393 ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
2394 RETURN_NOT_OK(ValidateFilePath(src_path));
2395 ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
2396 RETURN_NOT_OK(ValidateFilePath(dest_path));
2397
2398 if (src_path == dest_path) {
2399 return Status::OK();
2400 }
2401 return impl_->CopyObject(src_path, dest_path);
2402 }
2403
OpenInputStream(const std::string & s)2404 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
2405 const std::string& s) {
2406 return impl_->OpenInputFile(s, this);
2407 }
2408
OpenInputStream(const FileInfo & info)2409 Result<std::shared_ptr<io::InputStream>> S3FileSystem::OpenInputStream(
2410 const FileInfo& info) {
2411 return impl_->OpenInputFile(info, this);
2412 }
2413
OpenInputFile(const std::string & s)2414 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
2415 const std::string& s) {
2416 return impl_->OpenInputFile(s, this);
2417 }
2418
OpenInputFile(const FileInfo & info)2419 Result<std::shared_ptr<io::RandomAccessFile>> S3FileSystem::OpenInputFile(
2420 const FileInfo& info) {
2421 return impl_->OpenInputFile(info, this);
2422 }
2423
OpenOutputStream(const std::string & s,const std::shared_ptr<const KeyValueMetadata> & metadata)2424 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
2425 const std::string& s, const std::shared_ptr<const KeyValueMetadata>& metadata) {
2426 ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
2427 RETURN_NOT_OK(ValidateFilePath(path));
2428
2429 auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path,
2430 impl_->options(), metadata);
2431 RETURN_NOT_OK(ptr->Init());
2432 return ptr;
2433 }
2434
OpenAppendStream(const std::string & path,const std::shared_ptr<const KeyValueMetadata> & metadata)2435 Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(
2436 const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
2437 // XXX Investigate UploadPartCopy? Does it work with source == destination?
2438 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
2439 // (but would need to fall back to GET if the current data is < 5 MB)
2440 return Status::NotImplemented("It is not possible to append efficiently to S3 objects");
2441 }
2442
2443 //
2444 // Top-level utility functions
2445 //
2446
ResolveBucketRegion(const std::string & bucket)2447 Result<std::string> ResolveBucketRegion(const std::string& bucket) {
2448 ARROW_ASSIGN_OR_RAISE(auto resolver, RegionResolver::DefaultInstance());
2449 return resolver->ResolveRegion(bucket);
2450 }
2451
2452 } // namespace fs
2453 } // namespace arrow
2454