1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <sstream>
21 #include <string>
22 #include <tuple>
23 #include <utility>
24
25 #include <aws/core/Aws.h>
26 #include <aws/core/client/RetryStrategy.h>
27 #include <aws/core/utils/DateTime.h>
28 #include <aws/core/utils/StringUtils.h>
29
30 #include "arrow/filesystem/filesystem.h"
31 #include "arrow/status.h"
32 #include "arrow/util/logging.h"
33 #include "arrow/util/print.h"
34 #include "arrow/util/string_view.h"
35
36 namespace arrow {
37 namespace fs {
38 namespace internal {
39
40 #define ARROW_AWS_ASSIGN_OR_RAISE_IMPL(outcome_name, lhs, rexpr) \
41 auto outcome_name = (rexpr); \
42 if (!outcome_name.IsSuccess()) { \
43 return ErrorToStatus(outcome_name.GetError()); \
44 } \
45 lhs = std::move(outcome_name).GetResultWithOwnership();
46
47 #define ARROW_AWS_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y)
48
49 #define ARROW_AWS_ASSIGN_OR_RAISE(lhs, rexpr) \
50 ARROW_AWS_ASSIGN_OR_RAISE_IMPL( \
51 ARROW_AWS_ASSIGN_OR_RAISE_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr);
52
53 template <typename Error>
IsConnectError(const Aws::Client::AWSError<Error> & error)54 inline bool IsConnectError(const Aws::Client::AWSError<Error>& error) {
55 if (error.ShouldRetry()) {
56 return true;
57 }
58 // Sometimes Minio may fail with a 503 error
59 // (exception name: XMinioServerNotInitialized,
60 // message: "Server not initialized, please try again")
61 if (error.GetExceptionName() == "XMinioServerNotInitialized") {
62 return true;
63 }
64 return false;
65 }
66
IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)67 inline bool IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
68 const auto error_type = error.GetErrorType();
69 return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET ||
70 error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND);
71 }
72
IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)73 inline bool IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
74 const auto error_type = error.GetErrorType();
75 return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS ||
76 error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU);
77 }
78
79 // TODO qualify error messages with a prefix indicating context
80 // (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...")
81 template <typename ErrorType>
ErrorToStatus(const std::string & prefix,const Aws::Client::AWSError<ErrorType> & error)82 Status ErrorToStatus(const std::string& prefix,
83 const Aws::Client::AWSError<ErrorType>& error) {
84 // XXX Handle fine-grained error types
85 // See
86 // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371
87 return Status::IOError(prefix, "AWS Error [code ",
88 static_cast<int>(error.GetErrorType()),
89 "]: ", error.GetMessage());
90 }
91
92 template <typename ErrorType, typename... Args>
ErrorToStatus(const std::tuple<Args &...> & prefix,const Aws::Client::AWSError<ErrorType> & error)93 Status ErrorToStatus(const std::tuple<Args&...>& prefix,
94 const Aws::Client::AWSError<ErrorType>& error) {
95 std::stringstream ss;
96 ::arrow::internal::PrintTuple(&ss, prefix);
97 return ErrorToStatus(ss.str(), error);
98 }
99
100 template <typename ErrorType>
ErrorToStatus(const Aws::Client::AWSError<ErrorType> & error)101 Status ErrorToStatus(const Aws::Client::AWSError<ErrorType>& error) {
102 return ErrorToStatus(std::string(), error);
103 }
104
105 template <typename AwsResult, typename Error>
OutcomeToStatus(const std::string & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)106 Status OutcomeToStatus(const std::string& prefix,
107 const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
108 if (outcome.IsSuccess()) {
109 return Status::OK();
110 } else {
111 return ErrorToStatus(prefix, outcome.GetError());
112 }
113 }
114
115 template <typename AwsResult, typename Error, typename... Args>
OutcomeToStatus(const std::tuple<Args &...> & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)116 Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
117 const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
118 if (outcome.IsSuccess()) {
119 return Status::OK();
120 } else {
121 return ErrorToStatus(prefix, outcome.GetError());
122 }
123 }
124
125 template <typename AwsResult, typename Error>
OutcomeToStatus(const Aws::Utils::Outcome<AwsResult,Error> & outcome)126 Status OutcomeToStatus(const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
127 return OutcomeToStatus(std::string(), outcome);
128 }
129
130 template <typename AwsResult, typename Error>
OutcomeToResult(Aws::Utils::Outcome<AwsResult,Error> outcome)131 Result<AwsResult> OutcomeToResult(Aws::Utils::Outcome<AwsResult, Error> outcome) {
132 if (outcome.IsSuccess()) {
133 return std::move(outcome).GetResultWithOwnership();
134 } else {
135 return ErrorToStatus(outcome.GetError());
136 }
137 }
138
ToAwsString(const std::string & s)139 inline Aws::String ToAwsString(const std::string& s) {
140 // Direct construction of Aws::String from std::string doesn't work because
141 // it uses a specific Allocator class.
142 return Aws::String(s.begin(), s.end());
143 }
144
FromAwsString(const Aws::String & s)145 inline util::string_view FromAwsString(const Aws::String& s) {
146 return {s.data(), s.length()};
147 }
148
ToURLEncodedAwsString(const std::string & s)149 inline Aws::String ToURLEncodedAwsString(const std::string& s) {
150 return Aws::Utils::StringUtils::URLEncode(s.data());
151 }
152
FromAwsDatetime(const Aws::Utils::DateTime & dt)153 inline TimePoint FromAwsDatetime(const Aws::Utils::DateTime& dt) {
154 return std::chrono::time_point_cast<std::chrono::nanoseconds>(dt.UnderlyingTimestamp());
155 }
156
157 // A connect retry strategy with a controlled max duration.
158
159 class ConnectRetryStrategy : public Aws::Client::RetryStrategy {
160 public:
161 static const int32_t kDefaultRetryInterval = 200; /* milliseconds */
162 static const int32_t kDefaultMaxRetryDuration = 6000; /* milliseconds */
163
164 explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval,
165 int32_t max_retry_duration = kDefaultMaxRetryDuration)
retry_interval_(retry_interval)166 : retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {}
167
ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)168 bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
169 long attempted_retries) const override { // NOLINT
170 if (!IsConnectError(error)) {
171 // Not a connect error, don't retry
172 return false;
173 }
174 return attempted_retries * retry_interval_ < max_retry_duration_;
175 }
176
CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)177 long CalculateDelayBeforeNextRetry( // NOLINT
178 const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
179 long attempted_retries) const override { // NOLINT
180 return retry_interval_;
181 }
182
183 protected:
184 int32_t retry_interval_;
185 int32_t max_retry_duration_;
186 };
187
188 } // namespace internal
189 } // namespace fs
190 } // namespace arrow
191