1 /** 2 * @file azure.h 3 * 4 * @section LICENSE 5 * 6 * The MIT License 7 * 8 * @copyright Copyright (c) 2017-2021 TileDB, Inc. 9 * 10 * Permission is hereby granted, free of charge, to any person obtaining a copy 11 * of this software and associated documentation files (the "Software"), to deal 12 * in the Software without restriction, including without limitation the rights 13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 14 * copies of the Software, and to permit persons to whom the Software is 15 * furnished to do so, subject to the following conditions: 16 * 17 * The above copyright notice and this permission notice shall be included in 18 * all copies or substantial portions of the Software. 19 * 20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 26 * THE SOFTWARE. 27 * 28 * @section DESCRIPTION 29 * 30 * This file defines the Azure class. 31 */ 32 33 #ifndef TILEDB_AZURE_H 34 #define TILEDB_AZURE_H 35 36 #ifdef HAVE_AZURE 37 #include "tiledb/common/status.h" 38 #include "tiledb/common/thread_pool.h" 39 #include "tiledb/sm/buffer/buffer.h" 40 #include "tiledb/sm/config/config.h" 41 #include "tiledb/sm/misc/constants.h" 42 #include "tiledb/sm/misc/uri.h" 43 44 #if !defined(NOMINMAX) 45 #define NOMINMAX // avoid min/max macros from windows headers 46 #endif 47 #include <base64.h> 48 #include <blob/blob_client.h> 49 #include <retry.h> 50 #include <storage_account.h> 51 #include <storage_credential.h> 52 #include <list> 53 #include <unordered_map> 54 55 // azure sdk transitively includes mmtime.h which defines TIME_MS 56 #ifdef TIME_MS 57 #undef TIME_MS 58 #endif 59 60 using namespace tiledb::common; 61 62 namespace tiledb { 63 namespace sm { 64 65 class Azure { 66 public: 67 /* ********************************* */ 68 /* CONSTRUCTORS & DESTRUCTORS */ 69 /* ********************************* */ 70 71 /** Constructor. */ 72 Azure(); 73 74 /** Destructor. */ 75 ~Azure(); 76 77 /* ********************************* */ 78 /* API */ 79 /* ********************************* */ 80 81 /** 82 * Initializes and connects an Azure client. 83 * 84 * @param config Configuration parameters. 85 * @param thread_pool The parent VFS thread pool. 86 * @return Status 87 */ 88 Status init(const Config& config, ThreadPool* thread_pool); 89 90 /** 91 * Creates a container. 92 * 93 * @param container The name of the container to be created. 94 * @return Status 95 */ 96 Status create_container(const URI& container) const; 97 98 /** Removes the contents of an Azure container. */ 99 Status empty_container(const URI& container) const; 100 101 /** 102 * Flushes an blob to Azure, finalizing the upload. 103 * 104 * @param uri The URI of the blob to be flushed. 105 * @return Status 106 */ 107 Status flush_blob(const URI& uri); 108 109 /** 110 * Check if a container is empty. 111 * 112 * @param container The name of the container. 113 * @param is_empty Mutates to `true` if the container is empty. 114 * @return Status 115 */ 116 Status is_empty_container(const URI& uri, bool* is_empty) const; 117 118 /** 119 * Check if a container exists. 120 * 121 * @param container The name of the container. 122 * @param is_container Mutates to `true` if `uri` is a container. 123 * @return Status 124 */ 125 Status is_container(const URI& uri, bool* is_container) const; 126 127 /** 128 * Checks if there is an object with prefix `uri/`. For instance, suppose 129 * the following objects exist: 130 * 131 * `azure://some_container/foo/bar1` 132 * `azure://some_container/foo2` 133 * 134 * `is_dir(`azure://some_container/foo`) and 135 * `is_dir(`azure://some_container/foo`) will both return `true`, whereas 136 * `is_dir(`azure://some_container/foo2`) will return `false`. This is because 137 * the function will first convert the input to `azure://some_container/foo2/` 138 * (appending `/` in the end) and then check if there exists any object with 139 * prefix `azure://some_container/foo2/` (in this case there is not). 140 * 141 * @param uri The URI to check. 142 * @param exists Sets it to `true` if the above mentioned condition holds. 143 * @return Status 144 */ 145 Status is_dir(const URI& uri, bool* exists) const; 146 147 /** 148 * Checks if the given URI is an existing Azure blob. 149 * 150 * @param uri The URI of the object to be checked. 151 * @param is_blob Mutates to `true` if `uri` is an existing blob, and `false` 152 * otherwise. 153 */ 154 Status is_blob(const URI& uri, bool* is_blob) const; 155 156 /** 157 * Lists the objects that start with `uri`. Full URI paths are 158 * retrieved for the matched objects. If a delimiter is specified, 159 * the URI paths will be truncated to the first delimiter character. 160 * For instance, if there is a hierarchy: 161 * 162 * - `foo/bar/baz` 163 * - `foo/bar/bash` 164 * - `foo/bar/bang` 165 * - `foo/boo` 166 * 167 * and the delimiter is `/`, the returned URIs will be 168 * 169 * - `foo/boo` 170 * - `foo/bar` 171 * 172 * @param uri The prefix URI. 173 * @param paths Pointer of a vector of URIs to store the retrieved paths. 174 * @param delimiter The delimiter that will 175 * @param max_paths The maximum number of paths to be retrieved. The default 176 * `-1` indicates that no upper bound is specified. 177 * @return Status 178 */ 179 Status ls( 180 const URI& uri, 181 std::vector<std::string>* paths, 182 const std::string& delimiter = "/", 183 int max_paths = -1) const; 184 185 /** 186 * Renames an object. 187 * 188 * @param old_uri The URI of the old path. 189 * @param new_uri The URI of the new path. 190 * @return Status 191 */ 192 Status move_object(const URI& old_uri, const URI& new_uri); 193 194 /** 195 * Renames a directory. Note that this is an expensive operation. 196 * The function will essentially copy all objects with directory 197 * prefix `old_uri` to new objects with prefix `new_uri` and then 198 * delete the old ones. 199 * 200 * @param old_uri The URI of the old path. 201 * @param new_uri The URI of the new path. 202 * @return Status 203 */ 204 Status move_dir(const URI& old_uri, const URI& new_uri); 205 206 /** 207 * Returns the size of the input blob with a given URI in bytes. 208 * 209 * @param uri The URI of the blob. 210 * @param nbytes Pointer to `uint64_t` bytes to return. 211 * @return Status 212 */ 213 Status blob_size(const URI& uri, uint64_t* nbytes) const; 214 215 /** 216 * Reads data from an object into a buffer. 217 * 218 * @param uri The URI of the object to be read. 219 * @param offset The offset in the object from which the read will start. 220 * @param buffer The buffer into which the data will be written. 221 * @param length The size of the data to be read from the object. 222 * @param read_ahead_length The additional length to read ahead. 223 * @param length_returned Returns the total length read into `buffer`. 224 * @return Status 225 */ 226 Status read( 227 const URI& uri, 228 off_t offset, 229 void* buffer, 230 uint64_t length, 231 uint64_t read_ahead_length, 232 uint64_t* length_returned) const; 233 234 /** 235 * Deletes a container. 236 * 237 * @param uri The URI of the container to be deleted. 238 * @return Status 239 */ 240 Status remove_container(const URI& uri) const; 241 242 /** 243 * Deletes an blob with a given URI. 244 * 245 * @param uri The URI of the blob to be deleted. 246 * @return Status 247 */ 248 Status remove_blob(const URI& uri) const; 249 250 /** 251 * Deletes all objects with prefix `uri/` (if the ending `/` does not 252 * exist in `uri`, it is added by the function. 253 * 254 * For instance, suppose there exist the following objects: 255 * - `azure://some_container/foo/bar1` 256 * - `azure://some_container/foo/bar2/bar3 257 * - `azure://some_container/foo/bar4 258 * - `azure://some_container/foo2` 259 * 260 * `remove("azure://some_container/foo")` and 261 * `remove("azure://some_container/foo/")` will delete objects: 262 * 263 * - `azure://some_container/foo/bar1` 264 * - `azure://some_container/foo/bar2/bar3 265 * - `azure://some_container/foo/bar4 266 * 267 * In contrast, `remove("azure://some_container/foo2")` will not delete 268 * anything; the function internally appends `/` to the end of the URI, and 269 * therefore there is not object with prefix "azure://some_container/foo2/" in 270 * this example. 271 * 272 * @param uri The prefix uri of the objects to be deleted. 273 * @return Status 274 */ 275 Status remove_dir(const URI& uri) const; 276 277 /** 278 * Creates an empty blob. 279 * 280 * @param uri The URI of the blob to be created. 281 * @return Status 282 */ 283 Status touch(const URI& uri) const; 284 285 /** 286 * Writes the input buffer to an Azure object. Note that this is essentially 287 * an append operation implemented via multipart uploads. 288 * 289 * @param uri The URI of the object to be written to. 290 * @param buffer The input buffer. 291 * @param length The size of the input buffer. 292 * @return Status 293 */ 294 Status write(const URI& uri, const void* buffer, uint64_t length); 295 296 private: 297 /* ********************************* */ 298 /* PRIVATE DATATYPES */ 299 /* ********************************* */ 300 301 class AzureRetryPolicy final : public azure::storage_lite::retry_policy_base { 302 public: 303 /** 304 * The SDK invokes this routine before each request to Azure. This returns 305 * a pair: a bool to indicate if we should make a request and a time 306 * interval to wait before starting the request. 307 */ evaluate(const azure::storage_lite::retry_context & context)308 azure::storage_lite::retry_info evaluate( 309 const azure::storage_lite::retry_context& context) const override { 310 const int http_code = context.result(); 311 312 // When the response code is 0, the SDK has yet to send the initial 313 // request. Allow the request to start immediately by returning a 0-second 314 // delay. 315 if (http_code == 0) { 316 return azure::storage_lite::retry_info(true, std::chrono::seconds(0)); 317 } 318 319 // Determine if we should retry on the returned http code in the response. 320 if (!should_retry(http_code)) { 321 return azure::storage_lite::retry_info(false, std::chrono::seconds(0)); 322 } 323 324 // Wait one second before all retry attempts. 325 static const int32_t max_retries = constants::azure_max_attempts; 326 if (context.numbers() < max_retries) { 327 return azure::storage_lite::retry_info( 328 true, 329 std::chrono::seconds(constants::azure_attempt_sleep_ms / 1000)); 330 } 331 332 // All retry attempts exhausted. 333 return azure::storage_lite::retry_info(false, std::chrono::seconds(0)); 334 } 335 336 private: 337 /** 338 * Returns true if we should attempt a retry after receiving 'http_code' 339 * in the last response. 340 */ should_retry(const int http_code)341 bool should_retry(const int http_code) const { 342 // Only retry on server errors. 343 if (http_code >= 500 && http_code < 600) { 344 return true; 345 } 346 347 return false; 348 } 349 }; 350 351 /** Contains all state associated with a block list upload transaction. */ 352 class BlockListUploadState { 353 public: BlockListUploadState()354 BlockListUploadState() 355 : next_block_id_(0) 356 , st_(Status::Ok()) { 357 } 358 359 /* Generates the next base64-encoded block id. */ next_block_id()360 std::string next_block_id() { 361 const uint64_t block_id = next_block_id_++; 362 const std::string block_id_str = std::to_string(block_id); 363 364 // Pad the block id string with enough leading zeros to support 365 // the maximum number of blocks (50,000). All block ids must be 366 // of equal length among a single blob. 367 const int block_id_chars = 5; 368 const std::string padded_block_id_str = 369 std::string(block_id_chars - block_id_str.length(), '0') + 370 block_id_str; 371 372 const std::string b64_block_id_str = azure::storage_lite::to_base64( 373 reinterpret_cast<const unsigned char*>(padded_block_id_str.c_str()), 374 padded_block_id_str.size()); 375 376 block_ids_.emplace_back(b64_block_id_str); 377 378 return b64_block_id_str; 379 } 380 381 /* Returns all generated block ids. */ get_block_ids()382 std::list<std::string> get_block_ids() const { 383 return block_ids_; 384 } 385 386 /* Returns the aggregate status. */ st()387 Status st() const { 388 return st_; 389 } 390 391 /* Updates 'st_' if 'st' is non-OK */ update_st(const Status & st)392 void update_st(const Status& st) { 393 if (!st.ok()) { 394 st_ = st; 395 } 396 } 397 398 private: 399 // The next block id to generate. 400 uint64_t next_block_id_; 401 402 // A list of all generated block ids. 403 std::list<std::string> block_ids_; 404 405 // The aggregate status. If any individual block 406 // upload fails, this will be in a non-OK status. 407 Status st_; 408 }; 409 410 /** 411 * A zero-copy stream buffer used as a work-around for writing 412 * a single buffer to the stream-only SDK interface. 413 */ 414 class ZeroCopyStreamBuffer : public std::streambuf { 415 public: ZeroCopyStreamBuffer(char * const buffer,std::size_t size)416 ZeroCopyStreamBuffer(char* const buffer, std::size_t size) { 417 setg(buffer, buffer, buffer + size); 418 } 419 }; 420 421 /* ********************************* */ 422 /* PRIVATE ATTRIBUTES */ 423 /* ********************************* */ 424 425 /** The VFS thread pool. */ 426 ThreadPool* thread_pool_; 427 428 /** The Azure blob storage client. */ 429 tdb_shared_ptr<azure::storage_lite::blob_client> client_; 430 431 /** Maps a blob URI to an write cache buffer. */ 432 std::unordered_map<std::string, Buffer> write_cache_map_; 433 434 /** Protects 'write_cache_map_'. */ 435 std::mutex write_cache_map_lock_; 436 437 /** The maximum size of each value-element in 'write_cache_map_'. */ 438 uint64_t write_cache_max_size_; 439 440 /** The maximum number of parallel requests. */ 441 uint64_t max_parallel_ops_; 442 443 /** The target block size in a block list upload */ 444 uint64_t block_list_block_size_; 445 446 /** Whether or not to use block list upload. */ 447 bool use_block_list_upload_; 448 449 /** Maps a blob URI to its block list upload state. */ 450 std::unordered_map<std::string, BlockListUploadState> 451 block_list_upload_states_; 452 453 /** Protects 'block_list_upload_states_'. */ 454 std::mutex block_list_upload_states_lock_; 455 456 /* ********************************* */ 457 /* PRIVATE METHODS */ 458 /* ********************************* */ 459 460 /** 461 * Thread-safe fetch of the write cache buffer in `write_cache_map_`. 462 * If a buffer does not exist for `uri`, it will be created. 463 * 464 * @param uri The blob URI. 465 * @return Buffer 466 */ 467 Buffer* get_write_cache_buffer(const std::string& uri); 468 469 /** 470 * Fills the write cache buffer (given as an input `Buffer` object) from 471 * the input binary `buffer`, up until the size of the file buffer becomes 472 * `write_cache_max_size_`. It also retrieves the number of bytes filled. 473 * 474 * @param write_cache_buffer The destination write cache buffer to fill. 475 * @param buffer The source binary buffer to fill the data from. 476 * @param length The length of `buffer`. 477 * @param nbytes_filled The number of bytes filled into `write_cache_buffer`. 478 * @return Status 479 */ 480 Status fill_write_cache( 481 Buffer* write_cache_buffer, 482 const void* buffer, 483 const uint64_t length, 484 uint64_t* nbytes_filled); 485 486 /** 487 * Writes the contents of the input buffer to the blob given by 488 * the input `uri` as a new series of block uploads. Resets 489 * 'write_cache_buffer'. 490 * 491 * @param uri The blob URI. 492 * @param write_cache_buffer The input buffer to flush. 493 * @param last_block Should be true only when the flush corresponds to the 494 * last block(s) of a block list upload. 495 * @return Status 496 */ 497 Status flush_write_cache( 498 const URI& uri, Buffer* write_cache_buffer, bool last_block); 499 500 /** 501 * Writes the input buffer as an uncommited block to Azure by issuing one 502 * or more block upload requests. 503 * 504 * @param uri The blob URI. 505 * @param buffer The input buffer. 506 * @param length The size of the input buffer. 507 * @param last_part Should be true only when this is the last block of a blob. 508 * @return Status 509 */ 510 Status write_blocks( 511 const URI& uri, const void* buffer, uint64_t length, bool last_block); 512 513 /** 514 * Executes and waits for a single, uncommited block upload. 515 * 516 * @param container_name The blob's container name. 517 * @param blob_path The blob's file path relative to the container. 518 * @param length The length of `buffer`. 519 * @param block_id A base64-encoded string that is unique to this block 520 * within the blob. 521 * @param result The returned future to fetch the async upload result from. 522 * @return Status 523 */ 524 Status upload_block( 525 const std::string& container_name, 526 const std::string& blob_path, 527 const void* const buffer, 528 const uint64_t length, 529 const std::string& block_id); 530 531 /** 532 * Clears all instance state related to a block list upload on 'uri'. 533 */ 534 void finish_block_list_upload(const URI& uri); 535 536 /** 537 * Uploads the write cache buffer associated with 'uri' as an entire 538 * blob. 539 */ 540 Status flush_blob_direct(const URI& uri); 541 542 /** 543 * Parses a URI into a container name and blob path. For example, 544 * URI "azure://my-container/dir1/file1" will parse into 545 * `*container_name == "my-container"` and `*blob_path == "dir1/file1"`. 546 * 547 * @param uri The URI to parse. 548 * @param container_name Mutates to the container name. 549 * @param blob_path Mutates to the blob path. 550 * @return Status 551 */ 552 Status parse_azure_uri( 553 const URI& uri, 554 std::string* container_name, 555 std::string* blob_path) const; 556 557 /** 558 * Copies the blob at 'old_uri' to `new_uri`. 559 * 560 * @param old_uri The blob's current URI. 561 * @param new_uri The blob's URI to move to. 562 * @return Status 563 */ 564 Status copy_blob(const URI& old_uri, const URI& new_uri); 565 566 /** 567 * Waits for a blob with `container_name` and `blob_path` 568 * to exist on Azure. 569 * 570 * @param container_name The blob's container name. 571 * @param blob_path The blob's path 572 * @return Status 573 */ 574 Status wait_for_blob_to_propagate( 575 const std::string& container_name, const std::string& blob_path) const; 576 577 /** 578 * Waits for a blob with `container_name` and `blob_path` 579 * to not exist on Azure. 580 * 581 * @param container_name The blob's container name. 582 * @param blob_path The blob's path 583 * @return Status 584 */ 585 Status wait_for_blob_to_be_deleted( 586 const std::string& container_name, const std::string& blob_path) const; 587 588 /** 589 * Waits for a container with `container_name` 590 * to exist on Azure. 591 * 592 * @param container_name The container's name. 593 * @return Status 594 */ 595 Status wait_for_container_to_propagate( 596 const std::string& container_name) const; 597 598 /** 599 * Waits for a container with `container_name` 600 * to not exist on Azure. 601 * 602 * @param container_name The container's name. 603 * @return Status 604 */ 605 Status wait_for_container_to_be_deleted( 606 const std::string& container_name) const; 607 608 /** 609 * Check if 'container_name' is a container on Azure. 610 * 611 * @param container_name The container's name. 612 * @param is_container Mutates to the output. 613 * @return Status 614 */ 615 Status is_container( 616 const std::string& container_name, bool* const is_container) const; 617 618 /** 619 * Check if 'is_blob' is a blob on Azure. 620 * 621 * @param container_name The blob's container name. 622 * @param blob_path The blob's path. 623 * @param is_blob Mutates to the output. 624 * @return Status 625 */ 626 Status is_blob( 627 const std::string& container_name, 628 const std::string& blob_path, 629 bool* const is_blob) const; 630 631 /** 632 * Removes a leading slash from 'path' if it exists. 633 * 634 * @param path the string to remove the leading slash from. 635 */ 636 std::string remove_front_slash(const std::string& path) const; 637 638 /** 639 * Adds a trailing slash from 'path' if it doesn't already have one. 640 * 641 * @param path the string to add the trailing slash to. 642 */ 643 std::string add_trailing_slash(const std::string& path) const; 644 645 /** 646 * Removes a trailing slash from 'path' if it exists. 647 * 648 * @param path the string to remove the trailing slash from. 649 */ 650 std::string remove_trailing_slash(const std::string& path) const; 651 }; 652 653 } // namespace sm 654 } // namespace tiledb 655 656 #endif // HAVE_AZURE 657 #endif // TILEDB_AZURE_H 658