1 /**
2  * @file   vfs.h
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file declares the VFS class.
31  */
32 
33 #ifndef TILEDB_VFS_H
34 #define TILEDB_VFS_H
35 
36 #include <functional>
37 #include <list>
38 #include <set>
39 #include <string>
40 #include <vector>
41 
42 #include "tiledb/common/macros.h"
43 #include "tiledb/common/status.h"
44 #include "tiledb/common/thread_pool.h"
45 #include "tiledb/sm/buffer/buffer.h"
46 #include "tiledb/sm/cache/lru_cache.h"
47 #include "tiledb/sm/config/config.h"
48 #include "tiledb/sm/filesystem/filelock.h"
49 #include "tiledb/sm/filesystem/mem_filesystem.h"
50 #include "tiledb/sm/misc/cancelable_tasks.h"
51 #include "tiledb/sm/misc/uri.h"
52 #include "tiledb/sm/stats/stats.h"
53 
54 #ifdef _WIN32
55 #include "tiledb/sm/filesystem/win.h"
56 #else
57 #include "tiledb/sm/filesystem/posix.h"
58 #endif
59 
60 #ifdef HAVE_GCS
61 #include "tiledb/sm/filesystem/gcs.h"
62 #endif
63 
64 #ifdef HAVE_S3
65 #include "tiledb/sm/filesystem/s3.h"
66 #endif
67 
68 #ifdef HAVE_HDFS
69 #include "tiledb/sm/filesystem/hdfs_filesystem.h"
70 #endif
71 
72 #ifdef HAVE_AZURE
73 #include "tiledb/sm/filesystem/azure.h"
74 #endif
75 
76 using namespace tiledb::common;
77 
78 namespace tiledb {
79 namespace sm {
80 
81 enum class Filesystem : uint8_t;
82 enum class VFSMode : uint8_t;
83 
84 /**
85  * This class implements a virtual filesystem that directs filesystem-related
86  * function execution to the appropriate backend based on the input URI.
87  */
88 class VFS {
89  public:
90   /* ********************************* */
91   /*     CONSTRUCTORS & DESTRUCTORS    */
92   /* ********************************* */
93 
94   /** Constructor. */
95   VFS();
96 
97   /** Destructor. */
98   ~VFS() = default;
99 
100   DISABLE_COPY_AND_COPY_ASSIGN(VFS);
101   DISABLE_MOVE_AND_MOVE_ASSIGN(VFS);
102 
103   /* ********************************* */
104   /*               API                 */
105   /* ********************************* */
106 
107   /**
108    * Returns the absolute path of the input string (mainly useful for
109    * posix URI's).
110    *
111    * @param path The input path.
112    * @return The string with the absolute path.
113    */
114   static std::string abs_path(const std::string& path);
115 
116   /**
117    * Return a config object containing the VFS parameters. All other non-VFS
118    * parameters will are set to default values.
119    */
120   Config config() const;
121 
122   /**
123    * Creates a directory.
124    *
125    * - On S3, this is a noop.
126    * - On all other backends, if the directory exists, the function
127    *   just succeeds without doing anything.
128    *
129    * @param uri The URI of the directory.
130    * @return Status
131    */
132   Status create_dir(const URI& uri) const;
133 
134   /**
135    * Creates an empty file.
136    *
137    * @param uri The URI of the file.
138    * @return Status
139    */
140   Status touch(const URI& uri) const;
141 
142   /**
143    * Cancels all background or queued tasks.
144    */
145   Status cancel_all_tasks();
146 
147   /**
148    * Creates an object store bucket.
149    *
150    * @param uri The name of the bucket to be created.
151    * @return Status
152    */
153   Status create_bucket(const URI& uri) const;
154 
155   /**
156    * Returns the size of the files in the input directory.
157    * This function is **recursive**, i.e., it will calculate
158    * the sum of the files in the entire directory tree rooted
159    * at `dir_name`.
160    *
161    * @param dir_name The input directory.
162    * @param dir_size The directory size to be retrieved, as the
163    *     sum of the files one level deep.
164    * @return Status
165    */
166   Status dir_size(const URI& dir_name, uint64_t* dir_size) const;
167 
168   /**
169    * Deletes an object store bucket.
170    *
171    * @param uri The name of the bucket to be deleted.
172    * @return Status
173    */
174   Status remove_bucket(const URI& uri) const;
175 
176   /**
177    * Deletes the contents of an object store bucket.
178    *
179    * @param uri The name of the bucket to be emptied.
180    * @return Status
181    */
182   Status empty_bucket(const URI& uri) const;
183 
184   /**
185    * Removes a given directory (recursive)
186    *
187    * @param uri The uri of the directory to be removed
188    * @return Status
189    */
190   Status remove_dir(const URI& uri) const;
191 
192   /**
193    * Deletes a file.
194    *
195    * @param uri The URI of the file.
196    * @return Status
197    */
198   Status remove_file(const URI& uri) const;
199 
200   /**
201    * Locks a filelock.
202    *
203    * @param uri The URI of the filelock.
204    * @param lock A handle for the filelock (used in unlocking the
205    *     filelock).
206    * @param shared *True* if it is a shared lock, *false* if it is an
207    *     exclusive lock.
208    * @return Status
209    */
210   Status filelock_lock(const URI& uri, filelock_t* lock, bool shared) const;
211 
212   /**
213    * Unlocks a filelock.
214    *
215    * @param uri The URI of the filelock.
216    * @param lock The handle of the filelock.
217    * @return Status
218    */
219   Status filelock_unlock(const URI& uri) const;
220 
221   /**
222    * Retrieves the size of a file.
223    *
224    * @param uri The URI of the file.
225    * @param size The file size to be retrieved.
226    * @return Status
227    */
228   Status file_size(const URI& uri, uint64_t* size) const;
229 
230   /**
231    * Checks if a directory exists.
232    *
233    * @param uri The URI of the directory.
234    * @param is_dir Set to `true` if the directory exists and `false` otherwise.
235    * @return Status
236    *
237    * @note For S3, this function will return `true` if there is an object
238    *     with prefix `uri/` (TileDB will append `/` internally to `uri`
239    *     only if it does not exist), and `false` othewise.
240    */
241   Status is_dir(const URI& uri, bool* is_dir) const;
242 
243   /**
244    * Checks if a file exists.
245    *
246    * @param uri The URI of the file.
247    * @param is_file Set to `true` if the file exists and `false` otherwise.
248    * @return Status
249    */
250   Status is_file(const URI& uri, bool* is_file) const;
251 
252   /**
253    * Checks if an object store bucket exists.
254    *
255    * @param uri The name of the object store bucket.
256    * @return is_bucket Set to `true` if the bucket exists and `false` otherwise.
257    * @return Status
258    */
259   Status is_bucket(const URI& uri, bool* is_bucket) const;
260 
261   /**
262    * Checks if an object-store bucket is empty.
263    *
264    * @param uri The name of the object store bucket.
265    * @param is_empty Set to `true` if the bucket is empty and `false` otherwise.
266    */
267   Status is_empty_bucket(const URI& uri, bool* is_empty) const;
268 
269   /**
270    * Initializes the virtual filesystem with the given configuration.
271    *
272    * @param parent_stats The parent stats to inherit from.
273    * @param config Configuration parameters
274    * @return Status
275    */
276   Status init(
277       stats::Stats* parent_stats,
278       ThreadPool* compute_tp,
279       ThreadPool* io_tp,
280       const Config* ctx_config,
281       const Config* vfs_config);
282 
283   /**
284    * Terminates the virtual system. Must only be called if init() returned
285    * successfully. The behavior is undefined if not successfully invoked prior
286    * to destructing this object.
287    *
288    * @return Status
289    */
290   Status terminate();
291 
292   /**
293    * Retrieves all the URIs that have the first input as parent.
294    *
295    * @param parent The target directory to list.
296    * @param uris The URIs that are contained in the parent.
297    * @return Status
298    */
299   Status ls(const URI& parent, std::vector<URI>* uris) const;
300 
301   /**
302    * Renames a file.
303    *
304    * @param old_uri The old URI.
305    * @param new_uri The new URI.
306    * @return Status
307    */
308   Status move_file(const URI& old_uri, const URI& new_uri);
309 
310   /**
311    * Renames a directory.
312    *
313    * @param old_uri The old URI.
314    * @param new_uri The new URI.
315    * @return Status
316    */
317   Status move_dir(const URI& old_uri, const URI& new_uri);
318 
319   /**
320    * Copies a file.
321    *
322    * @param old_uri The old URI.
323    * @param new_uri The new URI.
324    * @return Status
325    */
326   Status copy_file(const URI& old_uri, const URI& new_uri);
327 
328   /**
329    * Copies directory.
330    *
331    * @param old_uri The old URI.
332    * @param new_uri The new URI.
333    * @return Status
334    */
335   Status copy_dir(const URI& old_uri, const URI& new_uri);
336 
337   /**
338    * Reads from a file.
339    *
340    * @param uri The URI of the file.
341    * @param offset The offset where the read begins.
342    * @param buffer The buffer to read into.
343    * @param nbytes Number of bytes to read.
344    * @param use_read_ahead Whether to use the read-ahead cache.
345    * @return Status
346    */
347   Status read(
348       const URI& uri,
349       uint64_t offset,
350       void* buffer,
351       uint64_t nbytes,
352       bool use_read_ahead = true);
353 
354   /**
355    * Reads multiple regions from a file.
356    *
357    * @param uri The URI of the file.
358    * @param regions The list of regions to read. Each region is a tuple
359    *    `(file_offset, dest_buffer, nbytes)`.
360    * @param thread_pool Thread pool to execute async read tasks to.
361    * @param tasks Vector to which new async read tasks are pushed.
362    * @param use_read_ahead Whether to use the read-ahead cache.
363    * @return Status
364    */
365   Status read_all(
366       const URI& uri,
367       const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions,
368       ThreadPool* thread_pool,
369       std::vector<ThreadPool::Task>* tasks,
370       bool use_read_ahead = true);
371 
372   /** Checks if a given filesystem is supported. */
373   bool supports_fs(Filesystem fs) const;
374 
375   /** Checks if the backend required to access the given URI is supported. */
376   bool supports_uri_scheme(const URI& uri) const;
377 
378   /**
379    * Syncs (flushes) a file. Note that for S3 this is a noop.
380    *
381    * @param uri The URI of the file.
382    * @return Status
383    */
384   Status sync(const URI& uri);
385 
386   /**
387    * Opens a file in a given mode.
388    *
389    *
390    * @param uri The URI of the file.
391    * @param mode The mode in which the file is opened:
392    *     - READ <br>
393    *       The file is opened for reading. An error is returned if the file
394    *       does not exist.
395    *     - WRITE <br>
396    *       The file is opened for writing. If the file exists, it will be
397    *       overwritten.
398    *     - APPEND <b>
399    *       The file is opened for writing. If the file exists, the write
400    *       will start from the end of the file. Note that S3 does not
401    *       support this operation and, thus, an error will be thrown in
402    *       that case.
403    * @return Status
404    */
405   Status open_file(const URI& uri, VFSMode mode);
406 
407   /**
408    * Closes a file, flushing its contents to persistent storage.
409    *
410    * @param uri The URI of the file.
411    * @return Status
412    */
413   Status close_file(const URI& uri);
414 
415   /**
416    * Writes the contents of a buffer into a file.
417    *
418    * @param uri The URI of the file.
419    * @param buffer The buffer to write from.
420    * @param buffer_size The buffer size.
421    * @return Status
422    */
423   Status write(const URI& uri, const void* buffer, uint64_t buffer_size);
424 
425  private:
426   /* ********************************* */
427   /*        PRIVATE DATATYPES          */
428   /* ********************************* */
429 
430   /**
431    * Helper type holding information about a batched read operation.
432    */
433   struct BatchedRead {
434     /** Construct a BatchedRead consisting of the single given region. */
BatchedReadBatchedRead435     BatchedRead(const std::tuple<uint64_t, void*, uint64_t>& region) {
436       offset = std::get<0>(region);
437       nbytes = std::get<2>(region);
438       regions.push_back(region);
439     }
440 
441     /** Offset of the batch. */
442     uint64_t offset;
443 
444     /** Number of bytes in the batch. */
445     uint64_t nbytes;
446 
447     /**
448      * Original regions making up the batch. Vector of tuples of the form
449      * (offset, dest_buffer, nbytes).
450      */
451     std::vector<std::tuple<uint64_t, void*, uint64_t>> regions;
452   };
453 
454   /**
455    * Represents a sub-range of data within a URI file at a
456    * specific file offset.
457    */
458   struct ReadAheadBuffer {
459     /* ********************************* */
460     /*            CONSTRUCTORS           */
461     /* ********************************* */
462 
463     /** Value Constructor. */
ReadAheadBufferReadAheadBuffer464     ReadAheadBuffer(const uint64_t offset, Buffer&& buffer)
465         : offset_(offset)
466         , buffer_(std::move(buffer)) {
467     }
468 
469     /** Move Constructor. */
ReadAheadBufferReadAheadBuffer470     ReadAheadBuffer(ReadAheadBuffer&& other)
471         : offset_(other.offset_)
472         , buffer_(std::move(other.buffer_)) {
473     }
474 
475     /* ********************************* */
476     /*             OPERATORS             */
477     /* ********************************* */
478 
479     /** Move-Assign Operator. */
480     ReadAheadBuffer& operator=(ReadAheadBuffer&& other) {
481       offset_ = other.offset_;
482       buffer_ = std::move(other.buffer_);
483       return *this;
484     }
485 
486     DISABLE_COPY_AND_COPY_ASSIGN(ReadAheadBuffer);
487 
488     /* ********************************* */
489     /*             ATTRIBUTES            */
490     /* ********************************* */
491 
492     /** The offset within the associated URI. */
493     uint64_t offset_;
494 
495     /** The buffered data at `offset`. */
496     Buffer buffer_;
497   };
498 
499   /**
500    * An LRU cache of `ReadAheadBuffer` objects keyed by a URI string.
501    */
502   class ReadAheadCache : public LRUCache<std::string, ReadAheadBuffer> {
503    public:
504     /* ********************************* */
505     /*     CONSTRUCTORS & DESTRUCTORS    */
506     /* ********************************* */
507 
508     /** Constructor. */
ReadAheadCache(const uint64_t max_cached_buffers)509     ReadAheadCache(const uint64_t max_cached_buffers)
510         : LRUCache(max_cached_buffers) {
511     }
512 
513     /** Destructor. */
514     virtual ~ReadAheadCache() = default;
515 
516     /* ********************************* */
517     /*                API                */
518     /* ********************************* */
519 
520     /**
521      * Attempts to read a buffer from the cache.
522      *
523      * @param uri The URI associated with the buffer to cache.
524      * @param offset The offset that buffer starts at within the URI.
525      * @param buffer The buffer to cache.
526      * @param nbytes The number of bytes within the buffer.
527      * @param success True if `buffer` was read from the cache.
528      * @return Status
529      */
read(const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,bool * const success)530     Status read(
531         const URI& uri,
532         const uint64_t offset,
533         void* const buffer,
534         const uint64_t nbytes,
535         bool* const success) {
536       assert(success);
537       *success = false;
538 
539       // Store the URI's string representation.
540       const std::string uri_str = uri.to_string();
541 
542       // Protect access to the derived LRUCache routines.
543       std::lock_guard<std::mutex> lg(lru_mtx_);
544 
545       // Check that a cached buffer exists for `uri`.
546       if (!has_item(uri_str))
547         return Status::Ok();
548 
549       // Store a reference to the cached buffer.
550       const ReadAheadBuffer* const ra_buffer = get_item(uri_str);
551 
552       // Check that the read offset is not below the offset of
553       // the cached buffer.
554       if (offset < ra_buffer->offset_)
555         return Status::Ok();
556 
557       // Calculate the offset within the cached buffer that corresponds
558       // to the requested read offset.
559       const uint64_t offset_in_buffer = offset - ra_buffer->offset_;
560 
561       // Check that both the start and end positions of the requested
562       // read range reside within the cached buffer.
563       if (offset_in_buffer + nbytes > ra_buffer->buffer_.size())
564         return Status::Ok();
565 
566       // Copy the subrange of the cached buffer that satisfies the caller's
567       // read request back into their output `buffer`.
568       std::memcpy(
569           buffer,
570           static_cast<uint8_t*>(ra_buffer->buffer_.data()) + offset_in_buffer,
571           nbytes);
572 
573       // Touch the item to make it the most recently used item.
574       touch_item(uri_str);
575 
576       *success = true;
577       return Status::Ok();
578     }
579 
580     /**
581      * Writes a cached buffer for the given uri.
582      *
583      * @param uri The URI associated with the buffer to cache.
584      * @param offset The offset that buffer starts at within the URI.
585      * @param buffer The buffer to cache.
586      * @return Status
587      */
insert(const URI & uri,const uint64_t offset,Buffer && buffer)588     Status insert(const URI& uri, const uint64_t offset, Buffer&& buffer) {
589       // Protect access to the derived LRUCache routines.
590       std::lock_guard<std::mutex> lg(lru_mtx_);
591 
592       const uint64_t size = buffer.size();
593       ReadAheadBuffer ra_buffer(offset, std::move(buffer));
594       return LRUCache<std::string, ReadAheadBuffer>::insert(
595           uri.to_string(), std::move(ra_buffer), size);
596     }
597 
598    private:
599     /* ********************************* */
600     /*         PRIVATE ATTRIBUTES        */
601     /* ********************************* */
602 
603     // Protects LRUCache routines.
604     std::mutex lru_mtx_;
605   };
606 
607   /* ********************************* */
608   /*         PRIVATE ATTRIBUTES        */
609   /* ********************************* */
610 
611 #ifdef HAVE_AZURE
612   Azure azure_;
613 #endif
614 
615 #ifdef HAVE_GCS
616   GCS gcs_;
617 #endif
618 
619 #ifdef HAVE_S3
620   S3 s3_;
621 #endif
622 
623 #ifdef _WIN32
624   Win win_;
625 #else
626   Posix posix_;
627 #endif
628 
629 #ifdef HAVE_HDFS
630   tdb_unique_ptr<hdfs::HDFS> hdfs_;
631 #endif
632 
633   /** The class stats. */
634   stats::Stats* stats_;
635 
636   /** The in-memory filesystem which is always supported */
637   MemFilesystem memfs_;
638 
639   /** Config. */
640   Config config_;
641 
642   /** `true` if the VFS object has been initialized. */
643   bool init_;
644 
645   /** The byte size to read-ahead for each read. */
646   uint64_t read_ahead_size_;
647 
648   /** The set with the supported filesystems. */
649   std::set<Filesystem> supported_fs_;
650 
651   /** Thread pool for compute-bound tasks. */
652   ThreadPool* compute_tp_;
653 
654   /** Thread pool for io-bound tasks. */
655   ThreadPool* io_tp_;
656 
657   /** Wrapper for tracking and canceling certain tasks on 'thread_pool' */
658   CancelableTasks cancelable_tasks_;
659 
660   /** The read-ahead cache. */
661   tdb_unique_ptr<ReadAheadCache> read_ahead_cache_;
662 
663   /* ********************************* */
664   /*          PRIVATE METHODS          */
665   /* ********************************* */
666 
667   /**
668    * Groups the given vector of regions to be read into a possibly smaller
669    * vector of batched reads.
670    *
671    * @param regions Vector of individual regions to be read. Each region is a
672    *    tuple `(file_offset, dest_buffer, nbytes)`.
673    * @param batches Vector storing the batched read information.
674    * @return Status
675    */
676   Status compute_read_batches(
677       const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions,
678       std::vector<BatchedRead>* batches) const;
679 
680   /**
681    * Reads from a file by calling the specific backend read function.
682    *
683    * @param uri The URI of the file.
684    * @param offset The offset where the read begins.
685    * @param buffer The buffer to read into.
686    * @param nbytes Number of bytes to read.
687    * @param use_read_ahead Whether to use the read-ahead cache.
688    * @return Status
689    */
690   Status read_impl(
691       const URI& uri,
692       uint64_t offset,
693       void* buffer,
694       uint64_t nbytes,
695       bool use_read_ahead);
696 
697   /**
698    * Executes a read, using the read-ahead cache as necessary.
699    *
700    * @param read_fn The read routine to execute.
701    * @param uri The URI of the file.
702    * @param offset The offset where the read begins.
703    * @param buffer The buffer to read into.
704    * @param nbytes Number of bytes to read.
705    * @param use_read_ahead Whether to use the read-ahead cache.
706    * @return Status
707    */
708   Status read_ahead_impl(
709       const std::function<Status(
710           const URI&, off_t, void*, uint64_t, uint64_t, uint64_t*)>& read_fn,
711       const URI& uri,
712       const uint64_t offset,
713       void* const buffer,
714       const uint64_t nbytes,
715       const bool use_read_ahead);
716 
717   /**
718    * Decrement the lock count of the given URI.
719    *
720    * @param uri The URI
721    * @param lock The filelock_t that is held, or INVALID_FILELOCK
722    * @return Status
723    */
724   Status decr_lock_count(const URI& uri, bool* is_zero, filelock_t* lock) const;
725 
726   /**
727    * Retrieves the backend-specific max number of parallel operations for VFS
728    * read.
729    */
730   Status max_parallel_ops(const URI& uri, uint64_t* ops) const;
731 };
732 
733 }  // namespace sm
734 }  // namespace tiledb
735 
736 #endif  // TILEDB_VFS_H
737