1 /** 2 * @file vfs.h 3 * 4 * @section LICENSE 5 * 6 * The MIT License 7 * 8 * @copyright Copyright (c) 2017-2021 TileDB, Inc. 9 * 10 * Permission is hereby granted, free of charge, to any person obtaining a copy 11 * of this software and associated documentation files (the "Software"), to deal 12 * in the Software without restriction, including without limitation the rights 13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 14 * copies of the Software, and to permit persons to whom the Software is 15 * furnished to do so, subject to the following conditions: 16 * 17 * The above copyright notice and this permission notice shall be included in 18 * all copies or substantial portions of the Software. 19 * 20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 26 * THE SOFTWARE. 27 * 28 * @section DESCRIPTION 29 * 30 * This file declares the VFS class. 31 */ 32 33 #ifndef TILEDB_VFS_H 34 #define TILEDB_VFS_H 35 36 #include <functional> 37 #include <list> 38 #include <set> 39 #include <string> 40 #include <vector> 41 42 #include "tiledb/common/macros.h" 43 #include "tiledb/common/status.h" 44 #include "tiledb/common/thread_pool.h" 45 #include "tiledb/sm/buffer/buffer.h" 46 #include "tiledb/sm/cache/lru_cache.h" 47 #include "tiledb/sm/config/config.h" 48 #include "tiledb/sm/filesystem/filelock.h" 49 #include "tiledb/sm/filesystem/mem_filesystem.h" 50 #include "tiledb/sm/misc/cancelable_tasks.h" 51 #include "tiledb/sm/misc/uri.h" 52 #include "tiledb/sm/stats/stats.h" 53 54 #ifdef _WIN32 55 #include "tiledb/sm/filesystem/win.h" 56 #else 57 #include "tiledb/sm/filesystem/posix.h" 58 #endif 59 60 #ifdef HAVE_GCS 61 #include "tiledb/sm/filesystem/gcs.h" 62 #endif 63 64 #ifdef HAVE_S3 65 #include "tiledb/sm/filesystem/s3.h" 66 #endif 67 68 #ifdef HAVE_HDFS 69 #include "tiledb/sm/filesystem/hdfs_filesystem.h" 70 #endif 71 72 #ifdef HAVE_AZURE 73 #include "tiledb/sm/filesystem/azure.h" 74 #endif 75 76 using namespace tiledb::common; 77 78 namespace tiledb { 79 namespace sm { 80 81 enum class Filesystem : uint8_t; 82 enum class VFSMode : uint8_t; 83 84 /** 85 * This class implements a virtual filesystem that directs filesystem-related 86 * function execution to the appropriate backend based on the input URI. 87 */ 88 class VFS { 89 public: 90 /* ********************************* */ 91 /* CONSTRUCTORS & DESTRUCTORS */ 92 /* ********************************* */ 93 94 /** Constructor. */ 95 VFS(); 96 97 /** Destructor. */ 98 ~VFS() = default; 99 100 DISABLE_COPY_AND_COPY_ASSIGN(VFS); 101 DISABLE_MOVE_AND_MOVE_ASSIGN(VFS); 102 103 /* ********************************* */ 104 /* API */ 105 /* ********************************* */ 106 107 /** 108 * Returns the absolute path of the input string (mainly useful for 109 * posix URI's). 110 * 111 * @param path The input path. 112 * @return The string with the absolute path. 113 */ 114 static std::string abs_path(const std::string& path); 115 116 /** 117 * Return a config object containing the VFS parameters. All other non-VFS 118 * parameters will are set to default values. 119 */ 120 Config config() const; 121 122 /** 123 * Creates a directory. 124 * 125 * - On S3, this is a noop. 126 * - On all other backends, if the directory exists, the function 127 * just succeeds without doing anything. 128 * 129 * @param uri The URI of the directory. 130 * @return Status 131 */ 132 Status create_dir(const URI& uri) const; 133 134 /** 135 * Creates an empty file. 136 * 137 * @param uri The URI of the file. 138 * @return Status 139 */ 140 Status touch(const URI& uri) const; 141 142 /** 143 * Cancels all background or queued tasks. 144 */ 145 Status cancel_all_tasks(); 146 147 /** 148 * Creates an object store bucket. 149 * 150 * @param uri The name of the bucket to be created. 151 * @return Status 152 */ 153 Status create_bucket(const URI& uri) const; 154 155 /** 156 * Returns the size of the files in the input directory. 157 * This function is **recursive**, i.e., it will calculate 158 * the sum of the files in the entire directory tree rooted 159 * at `dir_name`. 160 * 161 * @param dir_name The input directory. 162 * @param dir_size The directory size to be retrieved, as the 163 * sum of the files one level deep. 164 * @return Status 165 */ 166 Status dir_size(const URI& dir_name, uint64_t* dir_size) const; 167 168 /** 169 * Deletes an object store bucket. 170 * 171 * @param uri The name of the bucket to be deleted. 172 * @return Status 173 */ 174 Status remove_bucket(const URI& uri) const; 175 176 /** 177 * Deletes the contents of an object store bucket. 178 * 179 * @param uri The name of the bucket to be emptied. 180 * @return Status 181 */ 182 Status empty_bucket(const URI& uri) const; 183 184 /** 185 * Removes a given directory (recursive) 186 * 187 * @param uri The uri of the directory to be removed 188 * @return Status 189 */ 190 Status remove_dir(const URI& uri) const; 191 192 /** 193 * Deletes a file. 194 * 195 * @param uri The URI of the file. 196 * @return Status 197 */ 198 Status remove_file(const URI& uri) const; 199 200 /** 201 * Locks a filelock. 202 * 203 * @param uri The URI of the filelock. 204 * @param lock A handle for the filelock (used in unlocking the 205 * filelock). 206 * @param shared *True* if it is a shared lock, *false* if it is an 207 * exclusive lock. 208 * @return Status 209 */ 210 Status filelock_lock(const URI& uri, filelock_t* lock, bool shared) const; 211 212 /** 213 * Unlocks a filelock. 214 * 215 * @param uri The URI of the filelock. 216 * @param lock The handle of the filelock. 217 * @return Status 218 */ 219 Status filelock_unlock(const URI& uri) const; 220 221 /** 222 * Retrieves the size of a file. 223 * 224 * @param uri The URI of the file. 225 * @param size The file size to be retrieved. 226 * @return Status 227 */ 228 Status file_size(const URI& uri, uint64_t* size) const; 229 230 /** 231 * Checks if a directory exists. 232 * 233 * @param uri The URI of the directory. 234 * @param is_dir Set to `true` if the directory exists and `false` otherwise. 235 * @return Status 236 * 237 * @note For S3, this function will return `true` if there is an object 238 * with prefix `uri/` (TileDB will append `/` internally to `uri` 239 * only if it does not exist), and `false` othewise. 240 */ 241 Status is_dir(const URI& uri, bool* is_dir) const; 242 243 /** 244 * Checks if a file exists. 245 * 246 * @param uri The URI of the file. 247 * @param is_file Set to `true` if the file exists and `false` otherwise. 248 * @return Status 249 */ 250 Status is_file(const URI& uri, bool* is_file) const; 251 252 /** 253 * Checks if an object store bucket exists. 254 * 255 * @param uri The name of the object store bucket. 256 * @return is_bucket Set to `true` if the bucket exists and `false` otherwise. 257 * @return Status 258 */ 259 Status is_bucket(const URI& uri, bool* is_bucket) const; 260 261 /** 262 * Checks if an object-store bucket is empty. 263 * 264 * @param uri The name of the object store bucket. 265 * @param is_empty Set to `true` if the bucket is empty and `false` otherwise. 266 */ 267 Status is_empty_bucket(const URI& uri, bool* is_empty) const; 268 269 /** 270 * Initializes the virtual filesystem with the given configuration. 271 * 272 * @param parent_stats The parent stats to inherit from. 273 * @param config Configuration parameters 274 * @return Status 275 */ 276 Status init( 277 stats::Stats* parent_stats, 278 ThreadPool* compute_tp, 279 ThreadPool* io_tp, 280 const Config* ctx_config, 281 const Config* vfs_config); 282 283 /** 284 * Terminates the virtual system. Must only be called if init() returned 285 * successfully. The behavior is undefined if not successfully invoked prior 286 * to destructing this object. 287 * 288 * @return Status 289 */ 290 Status terminate(); 291 292 /** 293 * Retrieves all the URIs that have the first input as parent. 294 * 295 * @param parent The target directory to list. 296 * @param uris The URIs that are contained in the parent. 297 * @return Status 298 */ 299 Status ls(const URI& parent, std::vector<URI>* uris) const; 300 301 /** 302 * Renames a file. 303 * 304 * @param old_uri The old URI. 305 * @param new_uri The new URI. 306 * @return Status 307 */ 308 Status move_file(const URI& old_uri, const URI& new_uri); 309 310 /** 311 * Renames a directory. 312 * 313 * @param old_uri The old URI. 314 * @param new_uri The new URI. 315 * @return Status 316 */ 317 Status move_dir(const URI& old_uri, const URI& new_uri); 318 319 /** 320 * Copies a file. 321 * 322 * @param old_uri The old URI. 323 * @param new_uri The new URI. 324 * @return Status 325 */ 326 Status copy_file(const URI& old_uri, const URI& new_uri); 327 328 /** 329 * Copies directory. 330 * 331 * @param old_uri The old URI. 332 * @param new_uri The new URI. 333 * @return Status 334 */ 335 Status copy_dir(const URI& old_uri, const URI& new_uri); 336 337 /** 338 * Reads from a file. 339 * 340 * @param uri The URI of the file. 341 * @param offset The offset where the read begins. 342 * @param buffer The buffer to read into. 343 * @param nbytes Number of bytes to read. 344 * @param use_read_ahead Whether to use the read-ahead cache. 345 * @return Status 346 */ 347 Status read( 348 const URI& uri, 349 uint64_t offset, 350 void* buffer, 351 uint64_t nbytes, 352 bool use_read_ahead = true); 353 354 /** 355 * Reads multiple regions from a file. 356 * 357 * @param uri The URI of the file. 358 * @param regions The list of regions to read. Each region is a tuple 359 * `(file_offset, dest_buffer, nbytes)`. 360 * @param thread_pool Thread pool to execute async read tasks to. 361 * @param tasks Vector to which new async read tasks are pushed. 362 * @param use_read_ahead Whether to use the read-ahead cache. 363 * @return Status 364 */ 365 Status read_all( 366 const URI& uri, 367 const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions, 368 ThreadPool* thread_pool, 369 std::vector<ThreadPool::Task>* tasks, 370 bool use_read_ahead = true); 371 372 /** Checks if a given filesystem is supported. */ 373 bool supports_fs(Filesystem fs) const; 374 375 /** Checks if the backend required to access the given URI is supported. */ 376 bool supports_uri_scheme(const URI& uri) const; 377 378 /** 379 * Syncs (flushes) a file. Note that for S3 this is a noop. 380 * 381 * @param uri The URI of the file. 382 * @return Status 383 */ 384 Status sync(const URI& uri); 385 386 /** 387 * Opens a file in a given mode. 388 * 389 * 390 * @param uri The URI of the file. 391 * @param mode The mode in which the file is opened: 392 * - READ <br> 393 * The file is opened for reading. An error is returned if the file 394 * does not exist. 395 * - WRITE <br> 396 * The file is opened for writing. If the file exists, it will be 397 * overwritten. 398 * - APPEND <b> 399 * The file is opened for writing. If the file exists, the write 400 * will start from the end of the file. Note that S3 does not 401 * support this operation and, thus, an error will be thrown in 402 * that case. 403 * @return Status 404 */ 405 Status open_file(const URI& uri, VFSMode mode); 406 407 /** 408 * Closes a file, flushing its contents to persistent storage. 409 * 410 * @param uri The URI of the file. 411 * @return Status 412 */ 413 Status close_file(const URI& uri); 414 415 /** 416 * Writes the contents of a buffer into a file. 417 * 418 * @param uri The URI of the file. 419 * @param buffer The buffer to write from. 420 * @param buffer_size The buffer size. 421 * @return Status 422 */ 423 Status write(const URI& uri, const void* buffer, uint64_t buffer_size); 424 425 private: 426 /* ********************************* */ 427 /* PRIVATE DATATYPES */ 428 /* ********************************* */ 429 430 /** 431 * Helper type holding information about a batched read operation. 432 */ 433 struct BatchedRead { 434 /** Construct a BatchedRead consisting of the single given region. */ BatchedReadBatchedRead435 BatchedRead(const std::tuple<uint64_t, void*, uint64_t>& region) { 436 offset = std::get<0>(region); 437 nbytes = std::get<2>(region); 438 regions.push_back(region); 439 } 440 441 /** Offset of the batch. */ 442 uint64_t offset; 443 444 /** Number of bytes in the batch. */ 445 uint64_t nbytes; 446 447 /** 448 * Original regions making up the batch. Vector of tuples of the form 449 * (offset, dest_buffer, nbytes). 450 */ 451 std::vector<std::tuple<uint64_t, void*, uint64_t>> regions; 452 }; 453 454 /** 455 * Represents a sub-range of data within a URI file at a 456 * specific file offset. 457 */ 458 struct ReadAheadBuffer { 459 /* ********************************* */ 460 /* CONSTRUCTORS */ 461 /* ********************************* */ 462 463 /** Value Constructor. */ ReadAheadBufferReadAheadBuffer464 ReadAheadBuffer(const uint64_t offset, Buffer&& buffer) 465 : offset_(offset) 466 , buffer_(std::move(buffer)) { 467 } 468 469 /** Move Constructor. */ ReadAheadBufferReadAheadBuffer470 ReadAheadBuffer(ReadAheadBuffer&& other) 471 : offset_(other.offset_) 472 , buffer_(std::move(other.buffer_)) { 473 } 474 475 /* ********************************* */ 476 /* OPERATORS */ 477 /* ********************************* */ 478 479 /** Move-Assign Operator. */ 480 ReadAheadBuffer& operator=(ReadAheadBuffer&& other) { 481 offset_ = other.offset_; 482 buffer_ = std::move(other.buffer_); 483 return *this; 484 } 485 486 DISABLE_COPY_AND_COPY_ASSIGN(ReadAheadBuffer); 487 488 /* ********************************* */ 489 /* ATTRIBUTES */ 490 /* ********************************* */ 491 492 /** The offset within the associated URI. */ 493 uint64_t offset_; 494 495 /** The buffered data at `offset`. */ 496 Buffer buffer_; 497 }; 498 499 /** 500 * An LRU cache of `ReadAheadBuffer` objects keyed by a URI string. 501 */ 502 class ReadAheadCache : public LRUCache<std::string, ReadAheadBuffer> { 503 public: 504 /* ********************************* */ 505 /* CONSTRUCTORS & DESTRUCTORS */ 506 /* ********************************* */ 507 508 /** Constructor. */ ReadAheadCache(const uint64_t max_cached_buffers)509 ReadAheadCache(const uint64_t max_cached_buffers) 510 : LRUCache(max_cached_buffers) { 511 } 512 513 /** Destructor. */ 514 virtual ~ReadAheadCache() = default; 515 516 /* ********************************* */ 517 /* API */ 518 /* ********************************* */ 519 520 /** 521 * Attempts to read a buffer from the cache. 522 * 523 * @param uri The URI associated with the buffer to cache. 524 * @param offset The offset that buffer starts at within the URI. 525 * @param buffer The buffer to cache. 526 * @param nbytes The number of bytes within the buffer. 527 * @param success True if `buffer` was read from the cache. 528 * @return Status 529 */ read(const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,bool * const success)530 Status read( 531 const URI& uri, 532 const uint64_t offset, 533 void* const buffer, 534 const uint64_t nbytes, 535 bool* const success) { 536 assert(success); 537 *success = false; 538 539 // Store the URI's string representation. 540 const std::string uri_str = uri.to_string(); 541 542 // Protect access to the derived LRUCache routines. 543 std::lock_guard<std::mutex> lg(lru_mtx_); 544 545 // Check that a cached buffer exists for `uri`. 546 if (!has_item(uri_str)) 547 return Status::Ok(); 548 549 // Store a reference to the cached buffer. 550 const ReadAheadBuffer* const ra_buffer = get_item(uri_str); 551 552 // Check that the read offset is not below the offset of 553 // the cached buffer. 554 if (offset < ra_buffer->offset_) 555 return Status::Ok(); 556 557 // Calculate the offset within the cached buffer that corresponds 558 // to the requested read offset. 559 const uint64_t offset_in_buffer = offset - ra_buffer->offset_; 560 561 // Check that both the start and end positions of the requested 562 // read range reside within the cached buffer. 563 if (offset_in_buffer + nbytes > ra_buffer->buffer_.size()) 564 return Status::Ok(); 565 566 // Copy the subrange of the cached buffer that satisfies the caller's 567 // read request back into their output `buffer`. 568 std::memcpy( 569 buffer, 570 static_cast<uint8_t*>(ra_buffer->buffer_.data()) + offset_in_buffer, 571 nbytes); 572 573 // Touch the item to make it the most recently used item. 574 touch_item(uri_str); 575 576 *success = true; 577 return Status::Ok(); 578 } 579 580 /** 581 * Writes a cached buffer for the given uri. 582 * 583 * @param uri The URI associated with the buffer to cache. 584 * @param offset The offset that buffer starts at within the URI. 585 * @param buffer The buffer to cache. 586 * @return Status 587 */ insert(const URI & uri,const uint64_t offset,Buffer && buffer)588 Status insert(const URI& uri, const uint64_t offset, Buffer&& buffer) { 589 // Protect access to the derived LRUCache routines. 590 std::lock_guard<std::mutex> lg(lru_mtx_); 591 592 const uint64_t size = buffer.size(); 593 ReadAheadBuffer ra_buffer(offset, std::move(buffer)); 594 return LRUCache<std::string, ReadAheadBuffer>::insert( 595 uri.to_string(), std::move(ra_buffer), size); 596 } 597 598 private: 599 /* ********************************* */ 600 /* PRIVATE ATTRIBUTES */ 601 /* ********************************* */ 602 603 // Protects LRUCache routines. 604 std::mutex lru_mtx_; 605 }; 606 607 /* ********************************* */ 608 /* PRIVATE ATTRIBUTES */ 609 /* ********************************* */ 610 611 #ifdef HAVE_AZURE 612 Azure azure_; 613 #endif 614 615 #ifdef HAVE_GCS 616 GCS gcs_; 617 #endif 618 619 #ifdef HAVE_S3 620 S3 s3_; 621 #endif 622 623 #ifdef _WIN32 624 Win win_; 625 #else 626 Posix posix_; 627 #endif 628 629 #ifdef HAVE_HDFS 630 tdb_unique_ptr<hdfs::HDFS> hdfs_; 631 #endif 632 633 /** The class stats. */ 634 stats::Stats* stats_; 635 636 /** The in-memory filesystem which is always supported */ 637 MemFilesystem memfs_; 638 639 /** Config. */ 640 Config config_; 641 642 /** `true` if the VFS object has been initialized. */ 643 bool init_; 644 645 /** The byte size to read-ahead for each read. */ 646 uint64_t read_ahead_size_; 647 648 /** The set with the supported filesystems. */ 649 std::set<Filesystem> supported_fs_; 650 651 /** Thread pool for compute-bound tasks. */ 652 ThreadPool* compute_tp_; 653 654 /** Thread pool for io-bound tasks. */ 655 ThreadPool* io_tp_; 656 657 /** Wrapper for tracking and canceling certain tasks on 'thread_pool' */ 658 CancelableTasks cancelable_tasks_; 659 660 /** The read-ahead cache. */ 661 tdb_unique_ptr<ReadAheadCache> read_ahead_cache_; 662 663 /* ********************************* */ 664 /* PRIVATE METHODS */ 665 /* ********************************* */ 666 667 /** 668 * Groups the given vector of regions to be read into a possibly smaller 669 * vector of batched reads. 670 * 671 * @param regions Vector of individual regions to be read. Each region is a 672 * tuple `(file_offset, dest_buffer, nbytes)`. 673 * @param batches Vector storing the batched read information. 674 * @return Status 675 */ 676 Status compute_read_batches( 677 const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions, 678 std::vector<BatchedRead>* batches) const; 679 680 /** 681 * Reads from a file by calling the specific backend read function. 682 * 683 * @param uri The URI of the file. 684 * @param offset The offset where the read begins. 685 * @param buffer The buffer to read into. 686 * @param nbytes Number of bytes to read. 687 * @param use_read_ahead Whether to use the read-ahead cache. 688 * @return Status 689 */ 690 Status read_impl( 691 const URI& uri, 692 uint64_t offset, 693 void* buffer, 694 uint64_t nbytes, 695 bool use_read_ahead); 696 697 /** 698 * Executes a read, using the read-ahead cache as necessary. 699 * 700 * @param read_fn The read routine to execute. 701 * @param uri The URI of the file. 702 * @param offset The offset where the read begins. 703 * @param buffer The buffer to read into. 704 * @param nbytes Number of bytes to read. 705 * @param use_read_ahead Whether to use the read-ahead cache. 706 * @return Status 707 */ 708 Status read_ahead_impl( 709 const std::function<Status( 710 const URI&, off_t, void*, uint64_t, uint64_t, uint64_t*)>& read_fn, 711 const URI& uri, 712 const uint64_t offset, 713 void* const buffer, 714 const uint64_t nbytes, 715 const bool use_read_ahead); 716 717 /** 718 * Decrement the lock count of the given URI. 719 * 720 * @param uri The URI 721 * @param lock The filelock_t that is held, or INVALID_FILELOCK 722 * @return Status 723 */ 724 Status decr_lock_count(const URI& uri, bool* is_zero, filelock_t* lock) const; 725 726 /** 727 * Retrieves the backend-specific max number of parallel operations for VFS 728 * read. 729 */ 730 Status max_parallel_ops(const URI& uri, uint64_t* ops) const; 731 }; 732 733 } // namespace sm 734 } // namespace tiledb 735 736 #endif // TILEDB_VFS_H 737