1 /* Copyright (C) 2019 MariaDB Corporation
2
3 This program is free software; you can redistribute it and/or
4 modify it under the terms of the GNU General Public License
5 as published by the Free Software Foundation; version 2 of
6 the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16 MA 02110-1301, USA. */
17
18
19 #include "S3Storage.h"
20 #include "Config.h"
21 #include <stdlib.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <sys/types.h>
25 #include <boost/filesystem.hpp>
26 #include <iostream>
27 #include <boost/uuid/uuid.hpp>
28 #include <boost/uuid/uuid_io.hpp>
29 #include <boost/uuid/random_generator.hpp>
30 #define BOOST_SPIRIT_THREADSAFE
31 #include <boost/property_tree/ptree.hpp>
32 #include <boost/property_tree/json_parser.hpp>
33 #include "Utilities.h"
34
35 using namespace std;
36
37 namespace storagemanager
38 {
tolower(const string & s)39 string tolower(const string &s)
40 {
41 string ret(s);
42 for (uint i = 0; i < ret.length(); i++)
43 ret[i] = ::tolower(ret[i]);
44 return ret;
45 }
46
WriteCallback(void * contents,size_t size,size_t nmemb,void * userp)47 static size_t WriteCallback(void *contents, size_t size, size_t nmemb, void *userp)
48 {
49 ((std::string*)userp)->append((char*)contents, size * nmemb);
50 return size * nmemb;
51 }
52
retryable_error(uint8_t s3err)53 inline bool retryable_error(uint8_t s3err)
54 {
55 return (
56 s3err == MS3_ERR_RESPONSE_PARSE ||
57 s3err == MS3_ERR_REQUEST_ERROR ||
58 s3err == MS3_ERR_OOM ||
59 s3err == MS3_ERR_IMPOSSIBLE ||
60 s3err == MS3_ERR_AUTH ||
61 s3err == MS3_ERR_SERVER ||
62 s3err == MS3_ERR_AUTH_ROLE
63 );
64 }
65
66 // Best effort to map the errors returned by the ms3 API to linux errnos
67 // Can and should be changed if we find better mappings. Some of these are
68 // nonsensical or misleading, so we should allow non-errno values to be propagated upward.
69 const int s3err_to_errno[] = {
70 0,
71 EINVAL, // 1 MS3_ERR_PARAMETER. Best effort. D'oh.
72 ENODATA, // 2 MS3_ERR_NO_DATA
73 ENAMETOOLONG, // 3 MS3_ERR_URI_TOO_LONG
74 EBADMSG, // 4 MS3_ERR_RESPONSE_PARSE
75 ECOMM, // 5 MS3_ERR_REQUEST_ERROR
76 ENOMEM, // 6 MS3_ERR_OOM
77 EINVAL, // 7 MS3_ERR_IMPOSSIBLE. Will have to look through the code to find out what this is exactly.
78 EKEYREJECTED, // 8 MS3_ERR_AUTH
79 ENOENT, // 9 MS3_ERR_NOT_FOUND
80 EPROTO, // 10 MS3_ERR_SERVER
81 EMSGSIZE, // 11 MS3_ERR_TOO_BIG
82 EKEYREJECTED // 12 MS3_ERR_AUTH_ROLE
83 };
84
85 const char *s3err_msgs[] = {
86 "All is well",
87 "A required function parameter is missing",
88 "No data is supplied to a function that requires data",
89 "The generated URI for the request is too long",
90 "The API could not parse the response",
91 "The API could not send the request",
92 "Could not allocate required memory",
93 "An impossible condition occurred, how unlucky are you?",
94 "Authentication failed",
95 "Object not found",
96 "Unknown error code in response",
97 "Data to PUT is too large; 4GB maximum length",
98 "Authentication failed, token has expired"
99 };
100
101
ScopedConnection(S3Storage * s,ms3_st * m)102 S3Storage::ScopedConnection::ScopedConnection(S3Storage *s, ms3_st *m) : s3(s), conn(m)
103 {
104 assert(conn);
105 }
106
~ScopedConnection()107 S3Storage::ScopedConnection::~ScopedConnection()
108 {
109 s3->returnConnection(conn);
110 }
111
S3Storage(bool skipRetry)112 S3Storage::S3Storage(bool skipRetry) : skipRetryableErrors(skipRetry)
113 {
114 /* Check creds from envvars
115 Get necessary vars from config
116 Init an ms3_st object
117 */
118 Config *config = Config::get();
119 const char *keyerr = "S3 access requires setting AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars, "
120 " or setting aws_access_key_id and aws_secret_access_key, or configure an IAM role with ec2_iam_mode=enabled."
121 " Check storagemanager.cnf file for more information.";
122 key = config->getValue("S3", "aws_access_key_id");
123 secret = config->getValue("S3", "aws_secret_access_key");
124 IAMrole = config->getValue("S3", "iam_role_name");
125 STSendpoint = config->getValue("S3", "sts_endpoint");
126 STSregion = config->getValue("S3", "sts_region");
127 string ec2_mode = tolower(config->getValue("S3", "ec2_iam_mode"));
128 string use_http = tolower(config->getValue("S3", "use_http"));
129 string ssl_verify = tolower(config->getValue("S3", "ssl_verify"));
130 string port_number = config->getValue("S3", "port_number");
131
132
133 bool keyMissing = false;
134 isEC2Instance = false;
135 ec2iamEnabled = false;
136 useHTTP = false;
137 sslVerify = true;
138 portNumber = 0;
139
140 if (!port_number.empty())
141 {
142 portNumber = stoi(port_number);
143 }
144
145 if (ec2_mode == "enabled")
146 {
147 ec2iamEnabled = true;
148 }
149
150 if (use_http == "enabled")
151 {
152 useHTTP = true;
153 }
154
155 if (ssl_verify == "disabled")
156 {
157 sslVerify = false;
158 }
159
160 if (key.empty())
161 {
162 char *_key_id = getenv("AWS_ACCESS_KEY_ID");
163 if (!_key_id)
164 {
165 keyMissing = true;
166 }
167 else
168 key = _key_id;
169 }
170 if (secret.empty())
171 {
172 char *_secret_id = getenv("AWS_SECRET_ACCESS_KEY");
173 if (!_secret_id)
174 {
175 keyMissing = true;
176 }
177 else
178 secret = _secret_id;
179 }
180
181 // Valid to not have keys configured if running on ec2 instance.
182 // Attempt to get those credentials from instance.
183 if (keyMissing)
184 {
185 if (ec2iamEnabled)
186 {
187 getIAMRoleFromMetadataEC2();
188 }
189 if (!IAMrole.empty() && getCredentialsFromMetadataEC2())
190 {
191 isEC2Instance = true;
192 }
193 else
194 {
195 logger->log(LOG_ERR, keyerr);
196 throw runtime_error(keyerr);
197 }
198 }
199
200 region = config->getValue("S3", "region");
201 bucket = config->getValue("S3", "bucket");
202 prefix = config->getValue("S3", "prefix");
203 if (bucket.empty())
204 {
205 const char *msg = "S3 access requires setting S3/bucket in storagemanager.cnf";
206 logger->log(LOG_ERR, msg);
207 throw runtime_error(msg);
208 }
209
210 endpoint = config->getValue("S3", "endpoint");
211
212 ms3_library_init();
213 //ms3_debug();
214 testConnectivityAndPerms();
215 }
216
~S3Storage()217 S3Storage::~S3Storage()
218 {
219 for (auto &conn : freeConns)
220 ms3_deinit(conn.conn);
221 ms3_library_deinit();
222 }
223
224 // convenience macro for testConnectivityAndPerms()
225 #define FAIL(OP) { \
226 const char *msg = "S3Storage: failed to " #OP ", check log files for specific error"; \
227 logger->log(LOG_ERR, msg); \
228 throw runtime_error(msg); \
229 }
230
231
getIAMRoleFromMetadataEC2()232 bool S3Storage::getIAMRoleFromMetadataEC2()
233 {
234 CURL *curl = NULL;
235 CURLcode curl_res;
236 string readBuffer;
237 string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/";
238 curl = curl_easy_init();
239 curl_easy_setopt(curl, CURLOPT_URL, instanceMetadata.c_str());
240 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
241 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
242 curl_res = curl_easy_perform(curl);
243 if (curl_res != CURLE_OK)
244 {
245 logger->log(LOG_ERR, "CURL fail %u",curl_res);
246 return false;
247 }
248 IAMrole = readBuffer;
249 //logger->log(LOG_INFO, "S3Storage: IAM Role Name = %s",IAMrole.c_str());
250
251 return true;
252 }
253
getCredentialsFromMetadataEC2()254 bool S3Storage::getCredentialsFromMetadataEC2()
255 {
256 CURL *curl = NULL;
257 CURLcode curl_res;
258 std::string readBuffer;
259 string instanceMetadata = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + IAMrole;
260 curl = curl_easy_init();
261 curl_easy_setopt(curl, CURLOPT_URL, instanceMetadata.c_str());
262 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
263 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
264 curl_res = curl_easy_perform(curl);
265 if (curl_res != CURLE_OK)
266 {
267 logger->log(LOG_ERR, "CURL fail %u",curl_res);
268 return false;
269 }
270 stringstream credentials(readBuffer);
271 boost::property_tree::ptree pt;
272 boost::property_tree::read_json(credentials, pt);
273 key = pt.get<string>("AccessKeyId");
274 secret = pt.get<string>("SecretAccessKey");
275 token = pt.get<string>("Token");
276 //logger->log(LOG_INFO, "S3Storage: key = %s secret = %s token = %s",key.c_str(),secret.c_str(),token.c_str());
277
278 return true;
279 }
280
testConnectivityAndPerms()281 void S3Storage::testConnectivityAndPerms()
282 {
283 boost::shared_array<uint8_t> testObj(new uint8_t[1]);
284 testObj[0] = 0;
285 boost::uuids::uuid u = boost::uuids::random_generator()();
286 ostringstream oss;
287 oss << u << "connectivity_test";
288
289 string testObjKey = oss.str();
290
291 int err = putObject(testObj, 1, testObjKey);
292 if (err)
293 FAIL(PUT)
294 bool _exists;
295 err = exists(testObjKey, &_exists);
296 if (err)
297 FAIL(HEAD)
298 size_t len;
299 err = getObject(testObjKey, &testObj, &len);
300 if (err)
301 FAIL(GET)
302 err = deleteObject(testObjKey);
303 if (err)
304 FAIL(DELETE)
305 logger->log(LOG_INFO, "S3Storage: S3 connectivity & permissions are OK");
306 }
307
getObject(const string & sourceKey,const string & destFile,size_t * size)308 int S3Storage::getObject(const string &sourceKey, const string &destFile, size_t *size)
309 {
310 int fd, err;
311 boost::shared_array<uint8_t> data;
312 size_t len, count = 0;
313 char buf[80];
314
315 err = getObject(sourceKey, &data, &len);
316 if (err)
317 return err;
318
319 fd = ::open(destFile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
320 if (fd < 0)
321 {
322 int l_errno = errno;
323 logger->log(LOG_ERR, "S3Storage::getObject(): Failed to open %s, got %s", destFile.c_str(),
324 strerror_r(l_errno, buf, 80));
325 errno = l_errno;
326 return err;
327 }
328 ScopedCloser s(fd);
329 while (count < len)
330 {
331 err = ::write(fd, &data[count], len - count);
332 if (err < 0)
333 {
334 int l_errno = errno;
335 logger->log(LOG_ERR, "S3Storage::getObject(): Failed to write to %s, got %s", destFile.c_str(),
336 strerror_r(errno, buf, 80));
337 errno = l_errno;
338 return -1;
339 }
340 count += err;
341 }
342 if (size)
343 *size = len;
344 return 0;
345 }
346
getObject(const string & _sourceKey,boost::shared_array<uint8_t> * data,size_t * size)347 int S3Storage::getObject(const string &_sourceKey, boost::shared_array<uint8_t> *data, size_t *size)
348 {
349 uint8_t err;
350 size_t len = 0;
351 uint8_t *_data = NULL;
352 string sourceKey = prefix + _sourceKey;
353
354 ms3_st *creds = getConnection();
355 if (!creds)
356 {
357 logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, S3Storage::getConnection() returned NULL on init");
358 errno = EINVAL;
359 return -1;
360 }
361 ScopedConnection sc(this, creds);
362
363 do {
364 err = ms3_get(creds, bucket.c_str(), sourceKey.c_str(), &_data, &len);
365 if (err && (!skipRetryableErrors && retryable_error(err)))
366 {
367 if (ms3_server_error(creds))
368 logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s."
369 " Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
370 else
371 logger->log(LOG_WARNING, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s. Retrying...",
372 s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
373 if (ec2iamEnabled)
374 {
375 getIAMRoleFromMetadataEC2();
376 getCredentialsFromMetadataEC2();
377 ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
378 }
379 else if(!IAMrole.empty())
380 {
381 ms3_assume_role(creds);
382 }
383 sleep(5);
384 }
385 } while (err && (!skipRetryableErrors && retryable_error(err)));
386 if (err)
387 {
388 if (ms3_server_error(creds))
389 logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, server says '%s'. bucket = %s, key = %s.",
390 ms3_server_error(creds), bucket.c_str(), sourceKey.c_str());
391 else
392 logger->log(LOG_ERR, "S3Storage::getObject(): failed to GET, got '%s'. bucket = %s, key = %s.",
393 s3err_msgs[err], bucket.c_str(), sourceKey.c_str());
394 data->reset();
395 errno = s3err_to_errno[err];
396 return -1;
397 }
398
399 data->reset(_data, free);
400 if (size)
401 *size = len;
402 return 0;
403 }
404
405
putObject(const string & sourceFile,const string & destKey)406 int S3Storage::putObject(const string &sourceFile, const string &destKey)
407 {
408 boost::shared_array<uint8_t> data;
409 int err, fd;
410 size_t len, count = 0;
411 char buf[80];
412 boost::system::error_code boost_err;
413
414 len = boost::filesystem::file_size(sourceFile, boost_err);
415 if (boost_err)
416 {
417 errno = boost_err.value();
418 return -1;
419 }
420 data.reset(new uint8_t[len]);
421 fd = ::open(sourceFile.c_str(), O_RDONLY);
422 if (fd < 0)
423 {
424 int l_errno = errno;
425 logger->log(LOG_ERR, "S3Storage::putObject(): Failed to open %s, got %s", sourceFile.c_str(),
426 strerror_r(l_errno, buf, 80));
427 errno = l_errno;
428 return -1;
429 }
430 ScopedCloser s(fd);
431 while (count < len)
432 {
433 err = ::read(fd, &data[count], len - count);
434 if (err < 0)
435 {
436 int l_errno = errno;
437 logger->log(LOG_ERR, "S3Storage::putObject(): Failed to read %s @ position %ld, got %s", sourceFile.c_str(),
438 count, strerror_r(l_errno, buf, 80));
439 errno = l_errno;
440 return -1;
441 }
442 else if (err == 0)
443 {
444 // this shouldn't happen, we just checked the size
445 logger->log(LOG_ERR, "S3Storage::putObject(): Got early EOF reading %s @ position %ld", sourceFile.c_str(),
446 count);
447 errno = ENODATA; // is there a better errno for early eof?
448 return -1;
449 }
450 count += err;
451 }
452
453 return putObject(data, len, destKey);
454 }
455
putObject(const boost::shared_array<uint8_t> data,size_t len,const string & _destKey)456 int S3Storage::putObject(const boost::shared_array<uint8_t> data, size_t len, const string &_destKey)
457 {
458 string destKey = prefix + _destKey;
459 uint8_t s3err;
460 ms3_st *creds = getConnection();
461 if (!creds)
462 {
463 logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, S3Storage::getConnection() returned NULL on init");
464 errno = EINVAL;
465 return -1;
466 }
467 ScopedConnection sc(this, creds);
468
469 do {
470 s3err = ms3_put(creds, bucket.c_str(), destKey.c_str(), data.get(), len);
471 if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
472 {
473 if (ms3_server_error(creds))
474 logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s."
475 " Retrying...", ms3_server_error(creds), bucket.c_str(), destKey.c_str());
476 else
477 logger->log(LOG_WARNING, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s."
478 " Retrying...", s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
479 if (ec2iamEnabled)
480 {
481 getIAMRoleFromMetadataEC2();
482 getCredentialsFromMetadataEC2();
483 ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
484 }
485 else if(!IAMrole.empty())
486 {
487 ms3_assume_role(creds);
488 }
489 sleep(5);
490 }
491 } while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
492 if (s3err)
493 {
494 if (ms3_server_error(creds))
495 logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, server says '%s'. bucket = %s, key = %s.",
496 ms3_server_error(creds), bucket.c_str(), destKey.c_str());
497 else
498 logger->log(LOG_ERR, "S3Storage::putObject(): failed to PUT, got '%s'. bucket = %s, key = %s.",
499 s3err_msgs[s3err], bucket.c_str(), destKey.c_str());
500 errno = s3err_to_errno[s3err];
501 return -1;
502 }
503 return 0;
504 }
505
deleteObject(const string & _key)506 int S3Storage::deleteObject(const string &_key)
507 {
508 uint8_t s3err;
509 string deleteKey = prefix + _key;
510 ms3_st *creds = getConnection();
511 if (!creds)
512 {
513 logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, S3Storage::getConnection() returned NULL on init");
514 errno = EINVAL;
515 return -1;
516 }
517 ScopedConnection sc(this, creds);
518
519 do {
520 s3err = ms3_delete(creds, bucket.c_str(), deleteKey.c_str());
521 if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
522 {
523 if (ms3_server_error(creds))
524 logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s."
525 " Retrying...", ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
526 else
527 logger->log(LOG_WARNING, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s. Retrying...",
528 s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
529 if (ec2iamEnabled)
530 {
531 getIAMRoleFromMetadataEC2();
532 getCredentialsFromMetadataEC2();
533 ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
534 }
535 else if(!IAMrole.empty())
536 {
537 ms3_assume_role(creds);
538 }
539 sleep(5);
540 }
541 } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
542
543 if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
544 {
545 if (ms3_server_error(creds))
546 logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, server says '%s'. bucket = %s, key = %s.",
547 ms3_server_error(creds), bucket.c_str(), deleteKey.c_str());
548 else
549 logger->log(LOG_ERR, "S3Storage::deleteObject(): failed to DELETE, got '%s'. bucket = %s, key = %s.",
550 s3err_msgs[s3err], bucket.c_str(), deleteKey.c_str());
551 return -1;
552 }
553 return 0;
554 }
555
copyObject(const string & _sourceKey,const string & _destKey)556 int S3Storage::copyObject(const string &_sourceKey, const string &_destKey)
557 {
558 string sourceKey = prefix + _sourceKey, destKey = prefix + _destKey;
559 uint8_t s3err;
560 ms3_st *creds = getConnection();
561 if (!creds)
562 {
563 logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, S3Storage::getConnection() returned NULL on init");
564 errno = EINVAL;
565 return -1;
566 }
567 ScopedConnection sc(this, creds);
568
569 do
570 {
571 s3err = ms3_copy(creds, bucket.c_str(), sourceKey.c_str(), bucket.c_str(), destKey.c_str());
572 if (s3err && (!skipRetryableErrors && retryable_error(s3err)))
573 {
574 if (ms3_server_error(creds))
575 logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
576 "destkey = %s. Retrying...", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
577 else
578 logger->log(LOG_WARNING, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
579 " destkey = %s. Retrying...", s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
580 if (ec2iamEnabled)
581 {
582 getIAMRoleFromMetadataEC2();
583 getCredentialsFromMetadataEC2();
584 ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
585 }
586 else if(!IAMrole.empty())
587 {
588 ms3_assume_role(creds);
589 }
590 sleep(5);
591 }
592 } while (s3err && (!skipRetryableErrors && retryable_error(s3err)));
593
594 if (s3err)
595 {
596 // added the add'l check MS3_ERR_NOT_FOUND to suppress error msgs for a legitimate case in IOC::copyFile()
597 if (ms3_server_error(creds) && s3err != MS3_ERR_NOT_FOUND)
598 logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, server says '%s'. bucket = %s, srckey = %s, "
599 "destkey = %s.", ms3_server_error(creds), bucket.c_str(), sourceKey.c_str(), destKey.c_str());
600 else if (s3err != MS3_ERR_NOT_FOUND)
601 logger->log(LOG_ERR, "S3Storage::copyObject(): failed to copy, got '%s'. bucket = %s, srckey = %s, "
602 "destkey = %s.", s3err_msgs[s3err], bucket.c_str(), sourceKey.c_str(), destKey.c_str());
603 errno = s3err_to_errno[s3err];
604 return -1;
605 }
606 return 0;
607
608 #if 0
609 // no s3-s3 copy yet. get & put for now.
610
611 int err;
612 boost::shared_array<uint8_t> data;
613 size_t len;
614 err = getObject(sourceKey, &data, &len);
615 if (err)
616 return err;
617 return putObject(data, len, destKey);
618 #endif
619 }
620
exists(const string & _key,bool * out)621 int S3Storage::exists(const string &_key, bool *out)
622 {
623 string existsKey = prefix + _key;
624 uint8_t s3err;
625 ms3_status_st status;
626 ms3_st *creds = getConnection();
627 if (!creds)
628 {
629 logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, S3Storage::getConnection() returned NULL on init");
630 errno = EINVAL;
631 return -1;
632 }
633 ScopedConnection sc(this, creds);
634
635 do {
636 s3err = ms3_status(creds, bucket.c_str(), existsKey.c_str(), &status);
637 if (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)))
638 {
639 if (ms3_server_error(creds))
640 logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s."
641 " Retrying...", ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
642 else
643 logger->log(LOG_WARNING, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s. Retrying...",
644 s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
645 if (ec2iamEnabled)
646 {
647 getIAMRoleFromMetadataEC2();
648 getCredentialsFromMetadataEC2();
649 ms3_ec2_set_cred(creds,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
650 }
651 else if(!IAMrole.empty())
652 {
653 ms3_assume_role(creds);
654 }
655 sleep(5);
656 }
657 } while (s3err && s3err != MS3_ERR_NOT_FOUND && (!skipRetryableErrors && retryable_error(s3err)));
658
659 if (s3err != 0 && s3err != MS3_ERR_NOT_FOUND)
660 {
661 if (ms3_server_error(creds))
662 logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, server says '%s'. bucket = %s, key = %s.",
663 ms3_server_error(creds), bucket.c_str(), existsKey.c_str());
664 else
665 logger->log(LOG_ERR, "S3Storage::exists(): failed to HEAD, got '%s'. bucket = %s, key = %s.",
666 s3err_msgs[s3err], bucket.c_str(), existsKey.c_str());
667 errno = s3err_to_errno[s3err];
668 return -1;
669 }
670
671 *out = (s3err == 0);
672 return 0;
673 }
674
getConnection()675 ms3_st * S3Storage::getConnection()
676 {
677 boost::unique_lock<boost::mutex> s(connMutex);
678
679 // prune the list. Most-idle connections are at the back.
680 timespec now;
681 clock_gettime(CLOCK_MONOTONIC_COARSE, &now);
682 while (!freeConns.empty())
683 {
684 Connection &back = freeConns.back();
685 if (back.idleSince.tv_sec + maxIdleSecs <= now.tv_sec)
686 {
687 ms3_deinit(back.conn);
688 //connMutexes.erase(back.conn);
689 back.conn = NULL;
690 freeConns.pop_back();
691 }
692 else
693 break; // everything in front of this entry will not have been idle long enough
694 }
695
696 // get a connection
697 ms3_st *ret = NULL;
698 uint8_t res = 0;
699 if (freeConns.empty())
700 {
701 ret = ms3_init(key.c_str(), secret.c_str(), region.c_str(), (endpoint.empty() ? NULL : endpoint.c_str()));
702 // Something went wrong with libmarias3 init
703 if (ret == NULL)
704 {
705 logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_init returned NULL, no specific info to report");
706 }
707 // Set option for use http instead of https
708 if (useHTTP)
709 {
710 ms3_set_option(ret, MS3_OPT_USE_HTTP, NULL);
711 }
712 // Set option to disable SSL Verification
713 if (!sslVerify)
714 {
715 ms3_set_option(ret, MS3_OPT_DISABLE_SSL_VERIFY, NULL);
716 }
717 // Port number is not 0 so it was set by cnf file
718 if (portNumber != 0)
719 {
720 ms3_set_option(ret, MS3_OPT_PORT_NUMBER, &portNumber);
721 }
722 // IAM role setup for keys
723 if (!IAMrole.empty())
724 {
725 if (isEC2Instance)
726 {
727 res = ms3_ec2_set_cred(ret,IAMrole.c_str(),key.c_str(),secret.c_str(),token.c_str());
728 }
729 else
730 {
731 res = ms3_init_assume_role(ret, (IAMrole.empty() ? NULL : IAMrole.c_str()),
732 (STSendpoint.empty() ? NULL : STSendpoint.c_str()),
733 (STSregion.empty() ? NULL : STSregion.c_str()));
734 }
735
736 if (res)
737 {
738 // Something is wrong with the assume role so abort as if the ms3_init failed
739 logger->log(LOG_ERR, "S3Storage::getConnection(): ERROR: ms3_init_assume_role. Verify iam_role_name = %s, aws_access_key_id, aws_secret_access_key values. Also check sts_region and sts_endpoint if configured.",IAMrole.c_str());
740 if (ms3_server_error(ret))
741 logger->log(LOG_ERR, "S3Storage::getConnection(): ms3_error: server says '%s' role name = %s", ms3_server_error(ret), IAMrole.c_str());
742 ms3_deinit(ret);
743 ret = NULL;
744 }
745 }
746 //assert(connMutexes[ret].try_lock());
747 s.unlock();
748 return ret;
749 }
750 assert(freeConns.front().idleSince.tv_sec + maxIdleSecs > now.tv_sec);
751 ret = freeConns.front().conn;
752 freeConns.pop_front();
753 //assert(connMutexes[ret].try_lock());
754 return ret;
755 }
756
returnConnection(ms3_st * ms3)757 void S3Storage::returnConnection(ms3_st *ms3)
758 {
759 assert(ms3);
760 Connection conn;
761 conn.conn = ms3;
762 clock_gettime(CLOCK_MONOTONIC_COARSE, &conn.idleSince);
763
764 boost::unique_lock<boost::mutex> s(connMutex);
765 freeConns.push_front(conn);
766 //connMutexes[ms3].unlock();
767 }
768
769 }
770