1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <sstream>
21 #include <string>
22 #include <tuple>
23 #include <utility>
24 
25 #include <aws/core/Aws.h>
26 #include <aws/core/client/RetryStrategy.h>
27 #include <aws/core/utils/DateTime.h>
28 #include <aws/core/utils/StringUtils.h>
29 
30 #include "arrow/filesystem/filesystem.h"
31 #include "arrow/status.h"
32 #include "arrow/util/logging.h"
33 #include "arrow/util/print.h"
34 #include "arrow/util/string_view.h"
35 
36 namespace arrow {
37 namespace fs {
38 namespace internal {
39 
40 #define ARROW_AWS_ASSIGN_OR_RAISE_IMPL(outcome_name, lhs, rexpr) \
41   auto outcome_name = (rexpr);                                   \
42   if (!outcome_name.IsSuccess()) {                               \
43     return ErrorToStatus(outcome_name.GetError());               \
44   }                                                              \
45   lhs = std::move(outcome_name).GetResultWithOwnership();
46 
47 #define ARROW_AWS_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y)
48 
49 #define ARROW_AWS_ASSIGN_OR_RAISE(lhs, rexpr) \
50   ARROW_AWS_ASSIGN_OR_RAISE_IMPL(             \
51       ARROW_AWS_ASSIGN_OR_RAISE_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr);
52 
53 template <typename Error>
IsConnectError(const Aws::Client::AWSError<Error> & error)54 inline bool IsConnectError(const Aws::Client::AWSError<Error>& error) {
55   if (error.ShouldRetry()) {
56     return true;
57   }
58   // Sometimes Minio may fail with a 503 error
59   // (exception name: XMinioServerNotInitialized,
60   //  message: "Server not initialized, please try again")
61   if (error.GetExceptionName() == "XMinioServerNotInitialized") {
62     return true;
63   }
64   return false;
65 }
66 
IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)67 inline bool IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
68   const auto error_type = error.GetErrorType();
69   return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET ||
70           error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND);
71 }
72 
IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)73 inline bool IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
74   const auto error_type = error.GetErrorType();
75   return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS ||
76           error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU);
77 }
78 
79 // TODO qualify error messages with a prefix indicating context
80 // (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...")
81 template <typename ErrorType>
ErrorToStatus(const std::string & prefix,const Aws::Client::AWSError<ErrorType> & error)82 Status ErrorToStatus(const std::string& prefix,
83                      const Aws::Client::AWSError<ErrorType>& error) {
84   // XXX Handle fine-grained error types
85   // See
86   // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371
87   return Status::IOError(prefix, "AWS Error [code ",
88                          static_cast<int>(error.GetErrorType()),
89                          "]: ", error.GetMessage());
90 }
91 
92 template <typename ErrorType, typename... Args>
ErrorToStatus(const std::tuple<Args &...> & prefix,const Aws::Client::AWSError<ErrorType> & error)93 Status ErrorToStatus(const std::tuple<Args&...>& prefix,
94                      const Aws::Client::AWSError<ErrorType>& error) {
95   std::stringstream ss;
96   ::arrow::internal::PrintTuple(&ss, prefix);
97   return ErrorToStatus(ss.str(), error);
98 }
99 
100 template <typename ErrorType>
ErrorToStatus(const Aws::Client::AWSError<ErrorType> & error)101 Status ErrorToStatus(const Aws::Client::AWSError<ErrorType>& error) {
102   return ErrorToStatus(std::string(), error);
103 }
104 
105 template <typename AwsResult, typename Error>
OutcomeToStatus(const std::string & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)106 Status OutcomeToStatus(const std::string& prefix,
107                        const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
108   if (outcome.IsSuccess()) {
109     return Status::OK();
110   } else {
111     return ErrorToStatus(prefix, outcome.GetError());
112   }
113 }
114 
115 template <typename AwsResult, typename Error, typename... Args>
OutcomeToStatus(const std::tuple<Args &...> & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)116 Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
117                        const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
118   if (outcome.IsSuccess()) {
119     return Status::OK();
120   } else {
121     return ErrorToStatus(prefix, outcome.GetError());
122   }
123 }
124 
125 template <typename AwsResult, typename Error>
OutcomeToStatus(const Aws::Utils::Outcome<AwsResult,Error> & outcome)126 Status OutcomeToStatus(const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
127   return OutcomeToStatus(std::string(), outcome);
128 }
129 
130 template <typename AwsResult, typename Error>
OutcomeToResult(Aws::Utils::Outcome<AwsResult,Error> outcome)131 Result<AwsResult> OutcomeToResult(Aws::Utils::Outcome<AwsResult, Error> outcome) {
132   if (outcome.IsSuccess()) {
133     return std::move(outcome).GetResultWithOwnership();
134   } else {
135     return ErrorToStatus(outcome.GetError());
136   }
137 }
138 
ToAwsString(const std::string & s)139 inline Aws::String ToAwsString(const std::string& s) {
140   // Direct construction of Aws::String from std::string doesn't work because
141   // it uses a specific Allocator class.
142   return Aws::String(s.begin(), s.end());
143 }
144 
FromAwsString(const Aws::String & s)145 inline util::string_view FromAwsString(const Aws::String& s) {
146   return {s.data(), s.length()};
147 }
148 
ToURLEncodedAwsString(const std::string & s)149 inline Aws::String ToURLEncodedAwsString(const std::string& s) {
150   return Aws::Utils::StringUtils::URLEncode(s.data());
151 }
152 
FromAwsDatetime(const Aws::Utils::DateTime & dt)153 inline TimePoint FromAwsDatetime(const Aws::Utils::DateTime& dt) {
154   return std::chrono::time_point_cast<std::chrono::nanoseconds>(dt.UnderlyingTimestamp());
155 }
156 
157 // A connect retry strategy with a controlled max duration.
158 
159 class ConnectRetryStrategy : public Aws::Client::RetryStrategy {
160  public:
161   static const int32_t kDefaultRetryInterval = 200;     /* milliseconds */
162   static const int32_t kDefaultMaxRetryDuration = 6000; /* milliseconds */
163 
164   explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval,
165                                 int32_t max_retry_duration = kDefaultMaxRetryDuration)
retry_interval_(retry_interval)166       : retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {}
167 
ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)168   bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
169                    long attempted_retries) const override {  // NOLINT
170     if (!IsConnectError(error)) {
171       // Not a connect error, don't retry
172       return false;
173     }
174     return attempted_retries * retry_interval_ < max_retry_duration_;
175   }
176 
CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)177   long CalculateDelayBeforeNextRetry(  // NOLINT
178       const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
179       long attempted_retries) const override {  // NOLINT
180     return retry_interval_;
181   }
182 
183  protected:
184   int32_t retry_interval_;
185   int32_t max_retry_duration_;
186 };
187 
188 }  // namespace internal
189 }  // namespace fs
190 }  // namespace arrow
191