1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <sstream>
21 #include <string>
22 #include <tuple>
23 #include <utility>
24 
25 #include <aws/core/Aws.h>
26 #include <aws/core/client/RetryStrategy.h>
27 #include <aws/core/http/HttpTypes.h>
28 #include <aws/core/utils/DateTime.h>
29 #include <aws/core/utils/StringUtils.h>
30 
31 #include "arrow/filesystem/filesystem.h"
32 #include "arrow/status.h"
33 #include "arrow/util/logging.h"
34 #include "arrow/util/print.h"
35 #include "arrow/util/string_view.h"
36 
37 namespace arrow {
38 namespace fs {
39 namespace internal {
40 
41 #define ARROW_AWS_ASSIGN_OR_RAISE_IMPL(outcome_name, lhs, rexpr) \
42   auto outcome_name = (rexpr);                                   \
43   if (!outcome_name.IsSuccess()) {                               \
44     return ErrorToStatus(outcome_name.GetError());               \
45   }                                                              \
46   lhs = std::move(outcome_name).GetResultWithOwnership();
47 
48 #define ARROW_AWS_ASSIGN_OR_RAISE_NAME(x, y) ARROW_CONCAT(x, y)
49 
50 #define ARROW_AWS_ASSIGN_OR_RAISE(lhs, rexpr) \
51   ARROW_AWS_ASSIGN_OR_RAISE_IMPL(             \
52       ARROW_AWS_ASSIGN_OR_RAISE_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr);
53 
54 // XXX Should we expose this at some point?
55 enum class S3Backend { Amazon, Minio, Other };
56 
57 // Detect the S3 backend type from the S3 server's response headers
DetectS3Backend(const Aws::Http::HeaderValueCollection & headers)58 S3Backend DetectS3Backend(const Aws::Http::HeaderValueCollection& headers) {
59   const auto it = headers.find("server");
60   if (it != headers.end()) {
61     const auto& value = util::string_view(it->second);
62     if (value.find("AmazonS3") != std::string::npos) {
63       return S3Backend::Amazon;
64     }
65     if (value.find("MinIO") != std::string::npos) {
66       return S3Backend::Minio;
67     }
68   }
69   return S3Backend::Other;
70 }
71 
72 template <typename Error>
DetectS3Backend(const Aws::Client::AWSError<Error> & error)73 S3Backend DetectS3Backend(const Aws::Client::AWSError<Error>& error) {
74   return DetectS3Backend(error.GetResponseHeaders());
75 }
76 
77 template <typename Error>
IsConnectError(const Aws::Client::AWSError<Error> & error)78 inline bool IsConnectError(const Aws::Client::AWSError<Error>& error) {
79   if (error.ShouldRetry()) {
80     return true;
81   }
82   // Sometimes Minio may fail with a 503 error
83   // (exception name: XMinioServerNotInitialized,
84   //  message: "Server not initialized, please try again")
85   if (error.GetExceptionName() == "XMinioServerNotInitialized") {
86     return true;
87   }
88   return false;
89 }
90 
IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)91 inline bool IsNotFound(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
92   const auto error_type = error.GetErrorType();
93   return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET ||
94           error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND);
95 }
96 
IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors> & error)97 inline bool IsAlreadyExists(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) {
98   const auto error_type = error.GetErrorType();
99   return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS ||
100           error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU);
101 }
102 
103 // TODO qualify error messages with a prefix indicating context
104 // (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...")
105 template <typename ErrorType>
ErrorToStatus(const std::string & prefix,const Aws::Client::AWSError<ErrorType> & error)106 Status ErrorToStatus(const std::string& prefix,
107                      const Aws::Client::AWSError<ErrorType>& error) {
108   // XXX Handle fine-grained error types
109   // See
110   // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371
111   return Status::IOError(prefix, "AWS Error [code ",
112                          static_cast<int>(error.GetErrorType()),
113                          "]: ", error.GetMessage());
114 }
115 
116 template <typename ErrorType, typename... Args>
ErrorToStatus(const std::tuple<Args &...> & prefix,const Aws::Client::AWSError<ErrorType> & error)117 Status ErrorToStatus(const std::tuple<Args&...>& prefix,
118                      const Aws::Client::AWSError<ErrorType>& error) {
119   std::stringstream ss;
120   ::arrow::internal::PrintTuple(&ss, prefix);
121   return ErrorToStatus(ss.str(), error);
122 }
123 
124 template <typename ErrorType>
ErrorToStatus(const Aws::Client::AWSError<ErrorType> & error)125 Status ErrorToStatus(const Aws::Client::AWSError<ErrorType>& error) {
126   return ErrorToStatus(std::string(), error);
127 }
128 
129 template <typename AwsResult, typename Error>
OutcomeToStatus(const std::string & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)130 Status OutcomeToStatus(const std::string& prefix,
131                        const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
132   if (outcome.IsSuccess()) {
133     return Status::OK();
134   } else {
135     return ErrorToStatus(prefix, outcome.GetError());
136   }
137 }
138 
139 template <typename AwsResult, typename Error, typename... Args>
OutcomeToStatus(const std::tuple<Args &...> & prefix,const Aws::Utils::Outcome<AwsResult,Error> & outcome)140 Status OutcomeToStatus(const std::tuple<Args&...>& prefix,
141                        const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
142   if (outcome.IsSuccess()) {
143     return Status::OK();
144   } else {
145     return ErrorToStatus(prefix, outcome.GetError());
146   }
147 }
148 
149 template <typename AwsResult, typename Error>
OutcomeToStatus(const Aws::Utils::Outcome<AwsResult,Error> & outcome)150 Status OutcomeToStatus(const Aws::Utils::Outcome<AwsResult, Error>& outcome) {
151   return OutcomeToStatus(std::string(), outcome);
152 }
153 
154 template <typename AwsResult, typename Error>
OutcomeToResult(Aws::Utils::Outcome<AwsResult,Error> outcome)155 Result<AwsResult> OutcomeToResult(Aws::Utils::Outcome<AwsResult, Error> outcome) {
156   if (outcome.IsSuccess()) {
157     return std::move(outcome).GetResultWithOwnership();
158   } else {
159     return ErrorToStatus(outcome.GetError());
160   }
161 }
162 
ToAwsString(const std::string & s)163 inline Aws::String ToAwsString(const std::string& s) {
164   // Direct construction of Aws::String from std::string doesn't work because
165   // it uses a specific Allocator class.
166   return Aws::String(s.begin(), s.end());
167 }
168 
FromAwsString(const Aws::String & s)169 inline util::string_view FromAwsString(const Aws::String& s) {
170   return {s.data(), s.length()};
171 }
172 
ToURLEncodedAwsString(const std::string & s)173 inline Aws::String ToURLEncodedAwsString(const std::string& s) {
174   return Aws::Utils::StringUtils::URLEncode(s.data());
175 }
176 
FromAwsDatetime(const Aws::Utils::DateTime & dt)177 inline TimePoint FromAwsDatetime(const Aws::Utils::DateTime& dt) {
178   return std::chrono::time_point_cast<std::chrono::nanoseconds>(dt.UnderlyingTimestamp());
179 }
180 
181 // A connect retry strategy with a controlled max duration.
182 
183 class ConnectRetryStrategy : public Aws::Client::RetryStrategy {
184  public:
185   static const int32_t kDefaultRetryInterval = 200;     /* milliseconds */
186   static const int32_t kDefaultMaxRetryDuration = 6000; /* milliseconds */
187 
188   explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval,
189                                 int32_t max_retry_duration = kDefaultMaxRetryDuration)
retry_interval_(retry_interval)190       : retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {}
191 
ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)192   bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
193                    long attempted_retries) const override {  // NOLINT
194     if (!IsConnectError(error)) {
195       // Not a connect error, don't retry
196       return false;
197     }
198     return attempted_retries * retry_interval_ < max_retry_duration_;
199   }
200 
CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> & error,long attempted_retries)201   long CalculateDelayBeforeNextRetry(  // NOLINT
202       const Aws::Client::AWSError<Aws::Client::CoreErrors>& error,
203       long attempted_retries) const override {  // NOLINT
204     return retry_interval_;
205   }
206 
207  protected:
208   int32_t retry_interval_;
209   int32_t max_retry_duration_;
210 };
211 
212 }  // namespace internal
213 }  // namespace fs
214 }  // namespace arrow
215