1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <sstream>
21 #include <string>
22 #include <tuple>
23 #include <utility>
24
25 #include <aws/core/Aws.h>
26 #include <aws/core/client/RetryStrategy.h>
27 #include <aws/core/http/HttpTypes.h>
28 #include <aws/core/utils/DateTime.h>
29 #include <aws/core/utils/StringUtils.h>
30
31 #include "arrow/filesystem/filesystem.h"
32 #include "arrow/status.h"
33 #include "arrow/util/logging.h"
34 #include "arrow/util/print.h"
35 #include "arrow/util/string_view.h"
36
37 namespace arrow {
38 namespace fs {
39 namespace internal {
40
41 #define ARROW_AWS_ASSIGN_OR_RAISE_IMPL(outcome_name, lhs, rexpr) \
42 auto outcome_name = (rexpr); \
43 if (!outcome_name.IsSuccess()) { \
44 return ErrorToStatus(outcome_name.GetError()); \
45 } \
46 lhs = std::move(outcome_name).GetResultWithOwnership();
47
48 #define ARROW_AWS_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y)
49
50 #define ARROW_AWS_ASSIGN_OR_RAISE(lhs, rexpr) \
51 ARROW_AWS_ASSIGN_OR_RAISE_IMPL( \
52 ARROW_AWS_ASSIGN_OR_RAISE_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr);
53
54 // XXX Should we expose this at some point?
55 enum class S3Backend { Amazon, Minio, Other };
56
57 // Detect the S3 backend type from the S3 server's response headers
DetectS3Backend(const Aws::Http::HeaderValueCollection & headers)58 S3Backend DetectS3Backend(const Aws::Http::HeaderValueCollection& headers) {
59 const auto it = headers.find("server");
60 if (it != headers.end()) {
61 const auto& value = util::string_view(it->second);
62 if (value.find("AmazonS3") != std::string::npos) {
63 return S3Backend::Amazon;
64 }
65 if (value.find("MinIO") != std::string::npos) {
66 return S3Backend::Minio;
67 }
68 }
69 return S3Backend::Other;
70 }
71
72 template <typename Error>
DetectS3Backend(const Aws::Client::AWSError<Error> & error)73 S3Backend DetectS3Backend(const Aws::Client::AWSError<Error>& error) {
74 return DetectS3Backend(error.GetResponseHeaders());
75 }
76
77 template <typename Error>
IsConnectError(const Aws::Client::AWSError<Error> & error)78 inline bool IsConnectError(const Aws::Client::AWSError<Error>& error) {
79 if (error.ShouldRetry()) {
80 return true;
81 }
82 // Sometimes Minio may fail with a 503 error
83 // (exception name: XMinioServerNotInitialized,
84 // message: "Server not initialized, please try again")
85 if (error.GetExceptionName() == "XMinioServerNotInitialized") {
86 return true;
87 }
88 return false;
89 }
90
IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)91 inline bool IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
92 const auto error_type = error.GetErrorType();
93 return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET ||
94 error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND);
95 }
96
IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)97 inline bool IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
98 const auto error_type = error.GetErrorType();
99 return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS ||
100 error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU);
101 }
102
103 // TODO qualify error messages with a prefix indicating context
104 // (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...")
105 template <typename ErrorType>
ErrorToStatus(const std::string & prefix,const Aws::Client::AWSError<ErrorType> & error)106 Status ErrorToStatus(const std::string& prefix,
107 const Aws::Client::AWSError<ErrorType>& error) {
108 // XXX Handle fine-grained error types
109 // See
110 // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371
111 return Status::IOError(prefix, "AWS Error [code ",
112 static_cast<int>(error.GetErrorType()),
113 "]: ", error.GetMessage());
114 }
115
116 template <typename ErrorType, typename... Args>
ErrorToStatus(const std::tuple<Args &...> & prefix,const Aws::Client::AWSError<ErrorType> & error)117 Status ErrorToStatus(const std::tuple<Args&...>& prefix,
118 const Aws::Client::AWSError<ErrorType>& error) {
119 std::stringstream ss;
120 ::arrow::internal::PrintTuple(&ss, prefix);
121 return ErrorToStatus(ss.str(), error);
122 }
123
124 template <typename ErrorType>
ErrorToStatus(const Aws::Client::AWSError<ErrorType> & error)125 Status ErrorToStatus(const Aws::Client::AWSError<ErrorType>& error) {
126 return ErrorToStatus(std::string(), error);
127 }
128
129 template <typename AwsResult, typename Error>
OutcomeToStatus(const std::string & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)130 Status OutcomeToStatus(const std::string& prefix,
131 const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
132 if (outcome.IsSuccess()) {
133 return Status::OK();
134 } else {
135 return ErrorToStatus(prefix, outcome.GetError());
136 }
137 }
138
139 template <typename AwsResult, typename Error, typename... Args>
OutcomeToStatus(const std::tuple<Args &...> & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)140 Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
141 const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
142 if (outcome.IsSuccess()) {
143 return Status::OK();
144 } else {
145 return ErrorToStatus(prefix, outcome.GetError());
146 }
147 }
148
149 template <typename AwsResult, typename Error>
OutcomeToStatus(const Aws::Utils::Outcome<AwsResult,Error> & outcome)150 Status OutcomeToStatus(const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
151 return OutcomeToStatus(std::string(), outcome);
152 }
153
154 template <typename AwsResult, typename Error>
OutcomeToResult(Aws::Utils::Outcome<AwsResult,Error> outcome)155 Result<AwsResult> OutcomeToResult(Aws::Utils::Outcome<AwsResult, Error> outcome) {
156 if (outcome.IsSuccess()) {
157 return std::move(outcome).GetResultWithOwnership();
158 } else {
159 return ErrorToStatus(outcome.GetError());
160 }
161 }
162
ToAwsString(const std::string & s)163 inline Aws::String ToAwsString(const std::string& s) {
164 // Direct construction of Aws::String from std::string doesn't work because
165 // it uses a specific Allocator class.
166 return Aws::String(s.begin(), s.end());
167 }
168
FromAwsString(const Aws::String & s)169 inline util::string_view FromAwsString(const Aws::String& s) {
170 return {s.data(), s.length()};
171 }
172
ToURLEncodedAwsString(const std::string & s)173 inline Aws::String ToURLEncodedAwsString(const std::string& s) {
174 return Aws::Utils::StringUtils::URLEncode(s.data());
175 }
176
FromAwsDatetime(const Aws::Utils::DateTime & dt)177 inline TimePoint FromAwsDatetime(const Aws::Utils::DateTime& dt) {
178 return std::chrono::time_point_cast<std::chrono::nanoseconds>(dt.UnderlyingTimestamp());
179 }
180
181 // A connect retry strategy with a controlled max duration.
182
183 class ConnectRetryStrategy : public Aws::Client::RetryStrategy {
184 public:
185 static const int32_t kDefaultRetryInterval = 200; /* milliseconds */
186 static const int32_t kDefaultMaxRetryDuration = 6000; /* milliseconds */
187
188 explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval,
189 int32_t max_retry_duration = kDefaultMaxRetryDuration)
retry_interval_(retry_interval)190 : retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {}
191
ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)192 bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
193 long attempted_retries) const override { // NOLINT
194 if (!IsConnectError(error)) {
195 // Not a connect error, don't retry
196 return false;
197 }
198 return attempted_retries * retry_interval_ < max_retry_duration_;
199 }
200
CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)201 long CalculateDelayBeforeNextRetry( // NOLINT
202 const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
203 long attempted_retries) const override { // NOLINT
204 return retry_interval_;
205 }
206
207 protected:
208 int32_t retry_interval_;
209 int32_t max_retry_duration_;
210 };
211
212 } // namespace internal
213 } // namespace fs
214 } // namespace arrow
215