1 /**
2  * @file   vfs.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file implements the VFS class.
31  */
32 
33 #include "tiledb/sm/filesystem/vfs.h"
34 #include "tiledb/common/heap_memory.h"
35 #include "tiledb/common/logger.h"
36 #include "tiledb/sm/buffer/buffer.h"
37 #include "tiledb/sm/enums/filesystem.h"
38 #include "tiledb/sm/enums/vfs_mode.h"
39 #include "tiledb/sm/filesystem/hdfs_filesystem.h"
40 #include "tiledb/sm/misc/parallel_functions.h"
41 #include "tiledb/sm/misc/utils.h"
42 #include "tiledb/sm/stats/global_stats.h"
43 
44 #include <iostream>
45 #include <list>
46 #include <sstream>
47 #include <unordered_map>
48 
49 using namespace tiledb::common;
50 
51 namespace tiledb {
52 namespace sm {
53 
54 /* ********************************* */
55 /*          GLOBAL VARIABLES         */
56 /* ********************************* */
57 
58 /**
59  * Map of file URI -> number of current locks. This is shared across the entire
60  * process.
61  */
62 static std::unordered_map<std::string, std::pair<uint64_t, filelock_t>>
63     process_filelocks_;
64 
65 /** Mutex protecting the filelock process and the filelock counts map. */
66 static std::mutex filelock_mtx_;
67 
68 /* ********************************* */
69 /*     CONSTRUCTORS & DESTRUCTORS    */
70 /* ********************************* */
71 
VFS()72 VFS::VFS()
73     : stats_(nullptr)
74     , init_(false)
75     , read_ahead_size_(0)
76     , compute_tp_(nullptr)
77     , io_tp_(nullptr) {
78 #ifdef HAVE_AZURE
79   supported_fs_.insert(Filesystem::AZURE);
80 #endif
81 #ifdef HAVE_GCS
82   supported_fs_.insert(Filesystem::GCS);
83 #endif
84 #ifdef HAVE_HDFS
85   supported_fs_.insert(Filesystem::HDFS);
86 #endif
87 #ifdef HAVE_S3
88   supported_fs_.insert(Filesystem::S3);
89 #endif
90   supported_fs_.insert(Filesystem::MEMFS);
91 }
92 
93 /* ********************************* */
94 /*                API                */
95 /* ********************************* */
96 
abs_path(const std::string & path)97 std::string VFS::abs_path(const std::string& path) {
98   // workaround for older clang (llvm 3.5) compilers (issue #828)
99   std::string path_copy = path;
100 #ifdef _WIN32
101   if (Win::is_win_path(path))
102     return Win::uri_from_path(Win::abs_path(path));
103   else if (URI::is_file(path))
104     return Win::uri_from_path(Win::abs_path(Win::path_from_uri(path)));
105 #else
106   if (URI::is_file(path))
107     return Posix::abs_path(path);
108 #endif
109   if (URI::is_hdfs(path))
110     return path_copy;
111   if (URI::is_s3(path))
112     return path_copy;
113   if (URI::is_azure(path))
114     return path_copy;
115   if (URI::is_gcs(path))
116     return path_copy;
117   if (URI::is_memfs(path))
118     return path_copy;
119   // Certainly starts with "<resource>://" other than "file://"
120   return path_copy;
121 }
122 
config() const123 Config VFS::config() const {
124   return config_;
125 }
126 
create_dir(const URI & uri) const127 Status VFS::create_dir(const URI& uri) const {
128   if (!init_)
129     return LOG_STATUS(
130         Status::VFSError("Cannot create directory; VFS not initialized"));
131 
132   if (!uri.is_s3() && !uri.is_azure() && !uri.is_gcs()) {
133     bool is_dir;
134     RETURN_NOT_OK(this->is_dir(uri, &is_dir));
135     if (is_dir)
136       return Status::Ok();
137   }
138 
139   if (uri.is_file()) {
140 #ifdef _WIN32
141     return win_.create_dir(uri.to_path());
142 #else
143     return posix_.create_dir(uri.to_path());
144 #endif
145   }
146   if (uri.is_hdfs()) {
147 #ifdef HAVE_HDFS
148     return hdfs_->create_dir(uri);
149 #else
150     return LOG_STATUS(
151         Status::VFSError("TileDB was built without HDFS support"));
152 #endif
153   }
154   if (uri.is_s3()) {
155 #ifdef HAVE_S3
156     // It is a noop for S3
157     return Status::Ok();
158 #else
159     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
160 #endif
161   }
162   if (uri.is_azure()) {
163 #ifdef HAVE_AZURE
164     // It is a noop for Azure
165     return Status::Ok();
166 #else
167     return LOG_STATUS(
168         Status::VFSError("TileDB was built without Azure support"));
169 #endif
170   }
171   if (uri.is_gcs()) {
172 #ifdef HAVE_GCS
173     // It is a noop for GCS
174     return Status::Ok();
175 #else
176     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
177 #endif
178   }
179   if (uri.is_memfs()) {
180     return memfs_.create_dir(uri.to_path());
181   }
182   return LOG_STATUS(Status::VFSError(
183       std::string("Unsupported URI scheme: ") + uri.to_string()));
184 }
185 
dir_size(const URI & dir_name,uint64_t * dir_size) const186 Status VFS::dir_size(const URI& dir_name, uint64_t* dir_size) const {
187   if (!init_)
188     return LOG_STATUS(
189         Status::VFSError("Cannot get directory size; VFS not initialized"));
190 
191   // Sanity check
192   bool is_dir;
193   RETURN_NOT_OK(this->is_dir(dir_name, &is_dir));
194   if (!is_dir)
195     return LOG_STATUS(Status::VFSError(
196         std::string("Cannot get directory size; Input '") +
197         dir_name.to_string() + "' is not a directory"));
198 
199   // Get all files in the tree rooted at `dir_name` and add their sizes
200   *dir_size = 0;
201   uint64_t size;
202   std::list<URI> to_ls;
203   bool is_file;
204   to_ls.push_front(dir_name);
205   do {
206     auto uri = to_ls.front();
207     to_ls.pop_front();
208     std::vector<URI> children;
209     RETURN_NOT_OK(ls(uri, &children));
210     for (const auto& child : children) {
211       RETURN_NOT_OK(this->is_file(child, &is_file));
212       if (is_file) {
213         RETURN_NOT_OK(file_size(child, &size));
214         *dir_size += size;
215       } else {
216         to_ls.push_back(child);
217       }
218     }
219   } while (!to_ls.empty());
220 
221   return Status::Ok();
222 }
223 
touch(const URI & uri) const224 Status VFS::touch(const URI& uri) const {
225   if (!init_)
226     return LOG_STATUS(
227         Status::VFSError("Cannot touch file; VFS not initialized"));
228 
229   if (uri.is_file()) {
230 #ifdef _WIN32
231     return win_.touch(uri.to_path());
232 #else
233     return posix_.touch(uri.to_path());
234 #endif
235   }
236   if (uri.is_hdfs()) {
237 #ifdef HAVE_HDFS
238     return hdfs_->touch(uri);
239 #else
240     return LOG_STATUS(
241         Status::VFSError("TileDB was built without HDFS support"));
242 #endif
243   }
244   if (uri.is_s3()) {
245 #ifdef HAVE_S3
246     return s3_.touch(uri);
247 #else
248     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
249 #endif
250   }
251   if (uri.is_azure()) {
252 #ifdef HAVE_AZURE
253     return azure_.touch(uri);
254 #else
255     return LOG_STATUS(
256         Status::VFSError("TileDB was built without Azure support"));
257 #endif
258   }
259   if (uri.is_gcs()) {
260 #ifdef HAVE_GCS
261     return gcs_.touch(uri);
262 #else
263     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
264 #endif
265   }
266   if (uri.is_memfs()) {
267     return memfs_.touch(uri.to_path());
268   }
269   return LOG_STATUS(Status::VFSError(
270       std::string("Unsupported URI scheme: ") + uri.to_string()));
271 }
272 
cancel_all_tasks()273 Status VFS::cancel_all_tasks() {
274   if (!init_)
275     return LOG_STATUS(
276         Status::VFSError("Cannot cancel all tasks; VFS not initialized"));
277 
278   cancelable_tasks_.cancel_all_tasks();
279   return Status::Ok();
280 }
281 
create_bucket(const URI & uri) const282 Status VFS::create_bucket(const URI& uri) const {
283   if (!init_)
284     return LOG_STATUS(
285         Status::VFSError("Cannot create bucket; VFS not initialized"));
286 
287   if (uri.is_s3()) {
288 #ifdef HAVE_S3
289     return s3_.create_bucket(uri);
290 #else
291     (void)uri;
292     return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
293 #endif
294   }
295   if (uri.is_azure()) {
296 #ifdef HAVE_AZURE
297     return azure_.create_container(uri);
298 #else
299     (void)uri;
300     return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
301 #endif
302   }
303   if (uri.is_gcs()) {
304 #ifdef HAVE_GCS
305     return gcs_.create_bucket(uri);
306 #else
307     (void)uri;
308     return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
309 #endif
310   }
311   return LOG_STATUS(Status::VFSError(
312       std::string("Cannot create bucket; Unsupported URI scheme: ") +
313       uri.to_string()));
314 }
315 
remove_bucket(const URI & uri) const316 Status VFS::remove_bucket(const URI& uri) const {
317   if (!init_)
318     return LOG_STATUS(
319         Status::VFSError("Cannot remove bucket; VFS not initialized"));
320 
321   if (uri.is_s3()) {
322 #ifdef HAVE_S3
323     return s3_.remove_bucket(uri);
324 #else
325     (void)uri;
326     return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
327 #endif
328   }
329   if (uri.is_azure()) {
330 #ifdef HAVE_AZURE
331     return azure_.remove_container(uri);
332 #else
333     (void)uri;
334     return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
335 #endif
336   }
337   if (uri.is_gcs()) {
338 #ifdef HAVE_GCS
339     return gcs_.remove_bucket(uri);
340 #else
341     (void)uri;
342     return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
343 #endif
344   }
345   return LOG_STATUS(Status::VFSError(
346       std::string("Cannot remove bucket; Unsupported URI scheme: ") +
347       uri.to_string()));
348 }
349 
empty_bucket(const URI & uri) const350 Status VFS::empty_bucket(const URI& uri) const {
351   if (!init_)
352     return LOG_STATUS(
353         Status::VFSError("Cannot empty bucket; VFS not "
354                          "initialized"));
355 
356   if (uri.is_s3()) {
357 #ifdef HAVE_S3
358     return s3_.empty_bucket(uri);
359 #else
360     (void)uri;
361     return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
362 #endif
363   }
364   if (uri.is_azure()) {
365 #ifdef HAVE_AZURE
366     return azure_.empty_container(uri);
367 #else
368     (void)uri;
369     return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
370 #endif
371   }
372   if (uri.is_gcs()) {
373 #ifdef HAVE_GCS
374     return gcs_.empty_bucket(uri);
375 #else
376     (void)uri;
377     return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
378 #endif
379   }
380   return LOG_STATUS(Status::VFSError(
381       std::string("Cannot empty bucket; Unsupported URI scheme: ") +
382       uri.to_string()));
383 }
384 
is_empty_bucket(const URI & uri,bool * is_empty) const385 Status VFS::is_empty_bucket(const URI& uri, bool* is_empty) const {
386   if (!init_)
387     return LOG_STATUS(
388         Status::VFSError("Cannot check if bucket is empty; "
389                          "VFS not initialized"));
390 
391   if (uri.is_s3()) {
392 #ifdef HAVE_S3
393     return s3_.is_empty_bucket(uri, is_empty);
394 #else
395     (void)uri;
396     (void)is_empty;
397     return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
398 #endif
399   }
400   if (uri.is_azure()) {
401 #ifdef HAVE_AZURE
402     return azure_.is_empty_container(uri, is_empty);
403 #else
404     (void)uri;
405     (void)is_empty;
406     return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
407 #endif
408   }
409   if (uri.is_gcs()) {
410 #ifdef HAVE_GCS
411     return gcs_.is_empty_bucket(uri, is_empty);
412 #else
413     (void)uri;
414     (void)is_empty;
415     return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
416 #endif
417   }
418   return LOG_STATUS(Status::VFSError(
419       std::string("Cannot remove bucket; Unsupported URI scheme: ") +
420       uri.to_string()));
421 }
422 
remove_dir(const URI & uri) const423 Status VFS::remove_dir(const URI& uri) const {
424   if (!init_)
425     return LOG_STATUS(
426         Status::VFSError("Cannot remove directory; VFS not "
427                          "initialized"));
428 
429   if (uri.is_file()) {
430 #ifdef _WIN32
431     return win_.remove_dir(uri.to_path());
432 #else
433     return posix_.remove_dir(uri.to_path());
434 #endif
435   } else if (uri.is_hdfs()) {
436 #ifdef HAVE_HDFS
437     return hdfs_->remove_dir(uri);
438 #else
439     return LOG_STATUS(
440         Status::VFSError("TileDB was built without HDFS support"));
441 #endif
442   } else if (uri.is_s3()) {
443 #ifdef HAVE_S3
444     return s3_.remove_dir(uri);
445 #else
446     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
447 #endif
448   } else if (uri.is_azure()) {
449 #ifdef HAVE_AZURE
450     return azure_.remove_dir(uri);
451 #else
452     return LOG_STATUS(
453         Status::VFSError("TileDB was built without Azure support"));
454 #endif
455   } else if (uri.is_gcs()) {
456 #ifdef HAVE_GCS
457     return gcs_.remove_dir(uri);
458 #else
459     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
460 #endif
461   } else if (uri.is_memfs()) {
462     return memfs_.remove(uri.to_path(), true);
463   } else {
464     return LOG_STATUS(
465         Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
466   }
467 }
468 
remove_file(const URI & uri) const469 Status VFS::remove_file(const URI& uri) const {
470   if (!init_)
471     return LOG_STATUS(
472         Status::VFSError("Cannot remove file; VFS not initialized"));
473 
474   if (uri.is_file()) {
475 #ifdef _WIN32
476     return win_.remove_file(uri.to_path());
477 #else
478     return posix_.remove_file(uri.to_path());
479 #endif
480   }
481   if (uri.is_hdfs()) {
482 #ifdef HAVE_HDFS
483     return hdfs_->remove_file(uri);
484 #else
485     return LOG_STATUS(
486         Status::VFSError("TileDB was built without HDFS support"));
487 #endif
488   }
489   if (uri.is_s3()) {
490 #ifdef HAVE_S3
491     return s3_.remove_object(uri);
492 #else
493     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
494 #endif
495   }
496   if (uri.is_azure()) {
497 #ifdef HAVE_AZURE
498     return azure_.remove_blob(uri);
499 #else
500     return LOG_STATUS(
501         Status::VFSError("TileDB was built without Azure support"));
502 #endif
503   }
504   if (uri.is_gcs()) {
505 #ifdef HAVE_GCS
506     return gcs_.remove_object(uri);
507 #else
508     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
509 #endif
510   }
511   if (uri.is_memfs()) {
512     return memfs_.remove(uri.to_path(), false);
513   }
514   return LOG_STATUS(
515       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
516 }
517 
filelock_lock(const URI & uri,filelock_t * lock,bool shared) const518 Status VFS::filelock_lock(const URI& uri, filelock_t* lock, bool shared) const {
519   if (!init_)
520     return LOG_STATUS(
521         Status::VFSError("Cannot lock filelock; VFS not initialized"));
522 
523   // Get config
524   bool found = false;
525   bool enable_filelocks = false;
526   RETURN_NOT_OK(config_.get<bool>(
527       "vfs.file.enable_filelocks", &enable_filelocks, &found));
528   assert(found);
529 
530   if (!enable_filelocks)
531     return Status::Ok();
532 
533   // Hold the lock while updating counts and performing the lock.
534   std::unique_lock<std::mutex> lck(filelock_mtx_);
535 
536   auto it = process_filelocks_.find(uri.to_string());
537   if (it != process_filelocks_.end()) {
538     it->second.first++;
539     // we need to return the lock for the xlock semantics
540     *lock = it->second.second;
541     return Status::Ok();
542   }
543 
544   // We must hold the fd in the global map in order to free from any context
545   if (uri.is_file()) {
546 #ifdef _WIN32
547     auto st = win_.filelock_lock(uri.to_path(), lock, shared);
548 #else
549     auto st = posix_.filelock_lock(uri.to_path(), lock, shared);
550 #endif
551 
552     if (st.ok())
553       process_filelocks_[uri.to_string()] = {1, *lock};
554     return st;
555   }
556 
557   if (uri.is_memfs()) {
558     return Status::Ok();
559   }
560   if (uri.is_hdfs()) {
561 #ifdef HAVE_HDFS
562     return Status::Ok();
563 #else
564     return LOG_STATUS(
565         Status::VFSError("TileDB was built without HDFS support"));
566 #endif
567   }
568   if (uri.is_s3()) {
569 #ifdef HAVE_S3
570     return Status::Ok();
571 #else
572     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
573 #endif
574   }
575   if (uri.is_azure()) {
576 #ifdef HAVE_AZURE
577     return Status::Ok();
578 #else
579     return LOG_STATUS(
580         Status::VFSError("TileDB was built without Azure support"));
581 #endif
582   }
583   if (uri.is_gcs()) {
584 #ifdef HAVE_GCS
585     return Status::Ok();
586 #else
587     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
588 #endif
589   }
590   return LOG_STATUS(
591       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
592 }
593 
filelock_unlock(const URI & uri) const594 Status VFS::filelock_unlock(const URI& uri) const {
595   if (!init_)
596     return LOG_STATUS(
597         Status::VFSError("Cannot unlock filelock; VFS not initialized"));
598 
599   // Get config
600   bool found = false;
601   bool enable_filelocks = false;
602   RETURN_NOT_OK(config_.get<bool>(
603       "vfs.file.enable_filelocks", &enable_filelocks, &found));
604   assert(found);
605 
606   if (!enable_filelocks)
607     return Status::Ok();
608 
609   // Hold the lock while updating counts and performing the unlock.
610   std::unique_lock<std::mutex> lck(filelock_mtx_);
611 
612   // Decrement the lock counter and return if the counter is still > 0.
613   bool should_unlock = false;
614   filelock_t fd = INVALID_FILELOCK;
615   Status st = decr_lock_count(uri, &should_unlock, &fd);
616   if (!st.ok() || !should_unlock) {
617     return st;
618   }
619 
620   if (uri.is_file()) {
621 #ifdef _WIN32
622     return win_.filelock_unlock(fd);
623 #else
624     return posix_.filelock_unlock(fd);
625 #endif
626   }
627   if (uri.is_hdfs()) {
628 #ifdef HAVE_HDFS
629     return Status::Ok();
630 #else
631     return LOG_STATUS(
632         Status::VFSError("TileDB was built without HDFS support"));
633 #endif
634   }
635   if (uri.is_s3()) {
636 #ifdef HAVE_S3
637     return Status::Ok();
638 #else
639     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
640 #endif
641   }
642   if (uri.is_azure()) {
643 #ifdef HAVE_AZURE
644     return Status::Ok();
645 #else
646     return LOG_STATUS(
647         Status::VFSError("TileDB was built without Azure support"));
648 #endif
649   }
650   if (uri.is_gcs()) {
651 #ifdef HAVE_GCS
652     return Status::Ok();
653 #else
654     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
655 #endif
656   }
657   return LOG_STATUS(
658       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
659 }
660 
decr_lock_count(const URI & uri,bool * is_zero,filelock_t * lock) const661 Status VFS::decr_lock_count(
662     const URI& uri, bool* is_zero, filelock_t* lock) const {
663   auto it = process_filelocks_.find(uri.to_string());
664   if (it == process_filelocks_.end()) {
665     return LOG_STATUS(
666         Status::VFSError("No lock counter for URI " + uri.to_string()));
667   } else if (it->second.first == 0) {
668     return LOG_STATUS(
669         Status::VFSError("Invalid lock count for URI " + uri.to_string()));
670   }
671 
672   it->second.first--;
673 
674   if (it->second.first == 0) {
675     *is_zero = true;
676     *lock = it->second.second;
677     process_filelocks_.erase(it);
678   } else {
679     *is_zero = false;
680   }
681   return Status::Ok();
682 }
683 
max_parallel_ops(const URI & uri,uint64_t * ops) const684 Status VFS::max_parallel_ops(const URI& uri, uint64_t* ops) const {
685   if (!init_)
686     return LOG_STATUS(
687         Status::VFSError("Cannot get max parallel ops; VFS not initialized"));
688 
689   bool found;
690   *ops = 0;
691 
692   if (uri.is_file()) {
693     RETURN_NOT_OK(
694         config_.get<uint64_t>("vfs.file.max_parallel_ops", ops, &found));
695     assert(found);
696   } else if (uri.is_hdfs()) {
697     // HDFS backend is currently serial.
698     *ops = 1;
699   } else if (uri.is_s3()) {
700     RETURN_NOT_OK(
701         config_.get<uint64_t>("vfs.s3.max_parallel_ops", ops, &found));
702     assert(found);
703   } else if (uri.is_azure()) {
704     RETURN_NOT_OK(
705         config_.get<uint64_t>("vfs.azure.max_parallel_ops", ops, &found));
706     assert(found);
707   } else if (uri.is_gcs()) {
708     RETURN_NOT_OK(
709         config_.get<uint64_t>("vfs.gcs.max_parallel_ops", ops, &found));
710     assert(found);
711   } else {
712     *ops = 1;
713   }
714 
715   return Status::Ok();
716 }
717 
file_size(const URI & uri,uint64_t * size) const718 Status VFS::file_size(const URI& uri, uint64_t* size) const {
719   if (!init_)
720     return LOG_STATUS(
721         Status::VFSError("Cannot get file size; VFS not initialized"));
722 
723   if (uri.is_file()) {
724 #ifdef _WIN32
725     return win_.file_size(uri.to_path(), size);
726 #else
727     return posix_.file_size(uri.to_path(), size);
728 #endif
729   }
730   if (uri.is_hdfs()) {
731 #ifdef HAVE_HDFS
732     return hdfs_->file_size(uri, size);
733 #else
734     return LOG_STATUS(
735         Status::VFSError("TileDB was built without HDFS support"));
736 #endif
737   }
738   if (uri.is_s3()) {
739 #ifdef HAVE_S3
740     return s3_.object_size(uri, size);
741 #else
742     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
743 #endif
744   }
745   if (uri.is_azure()) {
746 #ifdef HAVE_AZURE
747     return azure_.blob_size(uri, size);
748 #else
749     return LOG_STATUS(
750         Status::VFSError("TileDB was built without Azure support"));
751 #endif
752   }
753   if (uri.is_gcs()) {
754 #ifdef HAVE_GCS
755     return gcs_.object_size(uri, size);
756 #else
757     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
758 #endif
759   }
760   if (uri.is_memfs()) {
761     return memfs_.file_size(uri.to_path(), size);
762   }
763   return LOG_STATUS(
764       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
765 }
766 
is_dir(const URI & uri,bool * is_dir) const767 Status VFS::is_dir(const URI& uri, bool* is_dir) const {
768   if (!init_)
769     return LOG_STATUS(
770         Status::VFSError("Cannot check directory; VFS not initialized"));
771 
772   if (uri.is_file()) {
773 #ifdef _WIN32
774     *is_dir = win_.is_dir(uri.to_path());
775 #else
776     *is_dir = posix_.is_dir(uri.to_path());
777 #endif
778     return Status::Ok();
779   }
780   if (uri.is_hdfs()) {
781 #ifdef HAVE_HDFS
782     return hdfs_->is_dir(uri, is_dir);
783 #else
784     *is_dir = false;
785     return LOG_STATUS(
786         Status::VFSError("TileDB was built without HDFS support"));
787 #endif
788   }
789   if (uri.is_s3()) {
790 #ifdef HAVE_S3
791     return s3_.is_dir(uri, is_dir);
792 #else
793     *is_dir = false;
794     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
795 #endif
796   }
797   if (uri.is_azure()) {
798 #ifdef HAVE_AZURE
799     return azure_.is_dir(uri, is_dir);
800 #else
801     *is_dir = false;
802     return LOG_STATUS(
803         Status::VFSError("TileDB was built without Azure support"));
804 #endif
805   }
806   if (uri.is_gcs()) {
807 #ifdef HAVE_GCS
808     return gcs_.is_dir(uri, is_dir);
809 #else
810     *is_dir = false;
811     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
812 #endif
813   }
814   if (uri.is_memfs()) {
815     *is_dir = memfs_.is_dir(uri.to_path());
816     return Status::Ok();
817   }
818   return LOG_STATUS(
819       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
820 }
821 
is_file(const URI & uri,bool * is_file) const822 Status VFS::is_file(const URI& uri, bool* is_file) const {
823   if (!init_)
824     return LOG_STATUS(
825         Status::VFSError("Cannot check file; VFS not initialized"));
826 
827   if (uri.is_file()) {
828 #ifdef _WIN32
829     *is_file = win_.is_file(uri.to_path());
830 #else
831     *is_file = posix_.is_file(uri.to_path());
832 #endif
833     return Status::Ok();
834   }
835   if (uri.is_hdfs()) {
836 #ifdef HAVE_HDFS
837     return hdfs_->is_file(uri, is_file);
838 #else
839     *is_file = false;
840     return LOG_STATUS(
841         Status::VFSError("TileDB was built without HDFS support"));
842 #endif
843   }
844   if (uri.is_s3()) {
845 #ifdef HAVE_S3
846     RETURN_NOT_OK(s3_.is_object(uri, is_file));
847     return Status::Ok();
848 #else
849     *is_file = false;
850     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
851 #endif
852   }
853   if (uri.is_azure()) {
854 #ifdef HAVE_AZURE
855     return azure_.is_blob(uri, is_file);
856 #else
857     *is_file = false;
858     return LOG_STATUS(
859         Status::VFSError("TileDB was built without Azure support"));
860 #endif
861   }
862   if (uri.is_gcs()) {
863 #ifdef HAVE_GCS
864     return gcs_.is_object(uri, is_file);
865 #else
866     *is_file = false;
867     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
868 #endif
869   }
870   if (uri.is_memfs()) {
871     *is_file = memfs_.is_file(uri.to_path());
872     return Status::Ok();
873   }
874   return LOG_STATUS(
875       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
876 }
877 
is_bucket(const URI & uri,bool * is_bucket) const878 Status VFS::is_bucket(const URI& uri, bool* is_bucket) const {
879   if (!init_)
880     return LOG_STATUS(
881         Status::VFSError("Cannot check bucket; VFS not initialized"));
882 
883   if (uri.is_s3()) {
884 #ifdef HAVE_S3
885     RETURN_NOT_OK(s3_.is_bucket(uri, is_bucket));
886     return Status::Ok();
887 #else
888     *is_bucket = false;
889     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
890 #endif
891   }
892   if (uri.is_azure()) {
893 #ifdef HAVE_AZURE
894     RETURN_NOT_OK(azure_.is_container(uri, is_bucket));
895     return Status::Ok();
896 #else
897     *is_bucket = false;
898     return LOG_STATUS(
899         Status::VFSError("TileDB was built without Azure support"));
900 #endif
901   }
902   if (uri.is_gcs()) {
903 #ifdef HAVE_GCS
904     RETURN_NOT_OK(gcs_.is_bucket(uri, is_bucket));
905     return Status::Ok();
906 #else
907     *is_bucket = false;
908     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
909 #endif
910   }
911 
912   return LOG_STATUS(
913       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
914 }
915 
init(stats::Stats * const parent_stats,ThreadPool * const compute_tp,ThreadPool * const io_tp,const Config * const ctx_config,const Config * const vfs_config)916 Status VFS::init(
917     stats::Stats* const parent_stats,
918     ThreadPool* const compute_tp,
919     ThreadPool* const io_tp,
920     const Config* const ctx_config,
921     const Config* const vfs_config) {
922   stats_ = parent_stats->create_child("VFS");
923 
924   assert(compute_tp);
925   assert(io_tp);
926   compute_tp_ = compute_tp;
927   io_tp_ = io_tp;
928 
929   // Set appropriately the config
930   if (ctx_config)
931     config_ = *ctx_config;
932   if (vfs_config)
933     config_.inherit(*vfs_config);
934 
935   // Construct the read-ahead cache.
936   bool found = false;
937   uint64_t read_ahead_cache_size = 0;
938   RETURN_NOT_OK(config_.get<uint64_t>(
939       "vfs.read_ahead_cache_size", &read_ahead_cache_size, &found));
940   assert(found);
941   read_ahead_cache_ = tdb_unique_ptr<ReadAheadCache>(
942       tdb_new(ReadAheadCache, read_ahead_cache_size));
943 
944   // Store the read-ahead size.
945   RETURN_NOT_OK(
946       config_.get<uint64_t>("vfs.read_ahead_size", &read_ahead_size_, &found));
947   assert(found);
948 
949 #ifdef HAVE_HDFS
950   hdfs_ = tdb_unique_ptr<hdfs::HDFS>(tdb_new(hdfs::HDFS));
951   RETURN_NOT_OK(hdfs_->init(config_));
952 #endif
953 
954 #ifdef HAVE_S3
955   RETURN_NOT_OK(s3_.init(stats_, config_, io_tp_));
956 #endif
957 
958 #ifdef HAVE_AZURE
959   RETURN_NOT_OK(azure_.init(config_, io_tp_));
960 #endif
961 
962 #ifdef HAVE_GCS
963   Status st = gcs_.init(config_, io_tp_);
964   if (!st.ok()) {
965     // We should print some warning here, this LOG_STATUS only prints in
966     // verbose mode. Since this is called in the init of the context, we
967     // can't return the error through the normal set it on the context.
968     LOG_STATUS(Status::GCSError(
969         "GCS failed to initialize, GCS support will not be available in this "
970         "context: " +
971         st.message()));
972   }
973 #endif
974 
975 #ifdef _WIN32
976   win_.init(config_, io_tp_);
977 #else
978   posix_.init(config_, io_tp_);
979 #endif
980 
981   init_ = true;
982 
983   return Status::Ok();
984 }
985 
terminate()986 Status VFS::terminate() {
987 #ifdef HAVE_S3
988   return s3_.disconnect();
989 #endif
990 
991   return Status::Ok();
992 }
993 
ls(const URI & parent,std::vector<URI> * uris) const994 Status VFS::ls(const URI& parent, std::vector<URI>* uris) const {
995   if (!init_)
996     return LOG_STATUS(Status::VFSError("Cannot list; VFS not initialized"));
997 
998   std::vector<std::string> paths;
999   if (parent.is_file()) {
1000 #ifdef _WIN32
1001     RETURN_NOT_OK(win_.ls(parent.to_path(), &paths));
1002 #else
1003     RETURN_NOT_OK(posix_.ls(parent.to_path(), &paths));
1004 #endif
1005   } else if (parent.is_hdfs()) {
1006 #ifdef HAVE_HDFS
1007     RETURN_NOT_OK(hdfs_->ls(parent, &paths));
1008 #else
1009     return LOG_STATUS(
1010         Status::VFSError("TileDB was built without HDFS support"));
1011 #endif
1012   } else if (parent.is_s3()) {
1013 #ifdef HAVE_S3
1014     RETURN_NOT_OK(s3_.ls(parent, &paths));
1015 #else
1016     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1017 #endif
1018   } else if (parent.is_azure()) {
1019 #ifdef HAVE_AZURE
1020     RETURN_NOT_OK(azure_.ls(parent, &paths));
1021 #else
1022     return LOG_STATUS(
1023         Status::VFSError("TileDB was built without Azure support"));
1024 #endif
1025   } else if (parent.is_gcs()) {
1026 #ifdef HAVE_GCS
1027     RETURN_NOT_OK(gcs_.ls(parent, &paths));
1028 #else
1029     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1030 #endif
1031   } else if (parent.is_memfs()) {
1032     RETURN_NOT_OK(memfs_.ls(parent.to_path(), &paths));
1033     // URI class expects paths to be prepended
1034     for (std::string& path : paths) {
1035       path.insert(0, "mem://");
1036     }
1037   } else {
1038     return LOG_STATUS(
1039         Status::VFSError("Unsupported URI scheme: " + parent.to_string()));
1040   }
1041   parallel_sort(compute_tp_, paths.begin(), paths.end());
1042   for (auto& path : paths) {
1043     uris->emplace_back(path);
1044   }
1045   return Status::Ok();
1046 }
1047 
move_file(const URI & old_uri,const URI & new_uri)1048 Status VFS::move_file(const URI& old_uri, const URI& new_uri) {
1049   if (!init_)
1050     return LOG_STATUS(
1051         Status::VFSError("Cannot move file; VFS not initialized"));
1052 
1053   // If new_uri exists, delete it or raise an error based on `force`
1054   bool is_file;
1055   RETURN_NOT_OK(this->is_file(new_uri, &is_file));
1056   if (is_file)
1057     RETURN_NOT_OK(remove_file(new_uri));
1058 
1059   // File
1060   if (old_uri.is_file()) {
1061     if (new_uri.is_file()) {
1062 #ifdef _WIN32
1063       return win_.move_path(old_uri.to_path(), new_uri.to_path());
1064 #else
1065       return posix_.move_path(old_uri.to_path(), new_uri.to_path());
1066 #endif
1067     }
1068     return LOG_STATUS(Status::VFSError(
1069         "Moving files across filesystems is not supported yet"));
1070   }
1071 
1072   // HDFS
1073   if (old_uri.is_hdfs()) {
1074     if (new_uri.is_hdfs())
1075 #ifdef HAVE_HDFS
1076       return hdfs_->move_path(old_uri, new_uri);
1077 #else
1078       return LOG_STATUS(
1079           Status::VFSError("TileDB was built without HDFS support"));
1080 #endif
1081     return LOG_STATUS(Status::VFSError(
1082         "Moving files across filesystems is not supported yet"));
1083   }
1084 
1085   // S3
1086   if (old_uri.is_s3()) {
1087     if (new_uri.is_s3())
1088 #ifdef HAVE_S3
1089       return s3_.move_object(old_uri, new_uri);
1090 #else
1091       return LOG_STATUS(
1092           Status::VFSError("TileDB was built without S3 support"));
1093 #endif
1094     return LOG_STATUS(Status::VFSError(
1095         "Moving files across filesystems is not supported yet"));
1096   }
1097 
1098   // Azure
1099   if (old_uri.is_azure()) {
1100     if (new_uri.is_azure())
1101 #ifdef HAVE_AZURE
1102       return azure_.move_object(old_uri, new_uri);
1103 #else
1104       return LOG_STATUS(
1105           Status::VFSError("TileDB was built without Azure support"));
1106 #endif
1107     return LOG_STATUS(Status::VFSError(
1108         "Moving files across filesystems is not supported yet"));
1109   }
1110 
1111   // GCS
1112   if (old_uri.is_gcs()) {
1113     if (new_uri.is_gcs())
1114 #ifdef HAVE_GCS
1115       return gcs_.move_object(old_uri, new_uri);
1116 #else
1117       return LOG_STATUS(
1118           Status::VFSError("TileDB was built without GCS support"));
1119 #endif
1120     return LOG_STATUS(Status::VFSError(
1121         "Moving files across filesystems is not supported yet"));
1122   }
1123 
1124   // In-memory filesystem
1125   if (old_uri.is_memfs()) {
1126     if (new_uri.is_memfs()) {
1127       return memfs_.move(old_uri.to_path(), new_uri.to_path());
1128     }
1129     return LOG_STATUS(Status::VFSError(
1130         "Moving files across filesystems is not supported yet"));
1131   }
1132 
1133   // Unsupported filesystem
1134   return LOG_STATUS(Status::VFSError(
1135       "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1136       new_uri.to_string()));
1137 }
1138 
move_dir(const URI & old_uri,const URI & new_uri)1139 Status VFS::move_dir(const URI& old_uri, const URI& new_uri) {
1140   if (!init_)
1141     return LOG_STATUS(
1142         Status::VFSError("Cannot move directory; VFS not initialized"));
1143 
1144   // File
1145   if (old_uri.is_file()) {
1146     if (new_uri.is_file()) {
1147 #ifdef _WIN32
1148       return win_.move_path(old_uri.to_path(), new_uri.to_path());
1149 #else
1150       return posix_.move_path(old_uri.to_path(), new_uri.to_path());
1151 #endif
1152     }
1153     return LOG_STATUS(Status::VFSError(
1154         "Moving files across filesystems is not supported yet"));
1155   }
1156 
1157   // HDFS
1158   if (old_uri.is_hdfs()) {
1159     if (new_uri.is_hdfs())
1160 #ifdef HAVE_HDFS
1161       return hdfs_->move_path(old_uri, new_uri);
1162 #else
1163       return LOG_STATUS(
1164           Status::VFSError("TileDB was built without HDFS support"));
1165 #endif
1166     return LOG_STATUS(Status::VFSError(
1167         "Moving files across filesystems is not supported yet"));
1168   }
1169 
1170   // S3
1171   if (old_uri.is_s3()) {
1172     if (new_uri.is_s3())
1173 #ifdef HAVE_S3
1174       return s3_.move_dir(old_uri, new_uri);
1175 #else
1176       return LOG_STATUS(
1177           Status::VFSError("TileDB was built without S3 support"));
1178 #endif
1179     return LOG_STATUS(Status::VFSError(
1180         "Moving files across filesystems is not supported yet"));
1181   }
1182 
1183   // Azure
1184   if (old_uri.is_azure()) {
1185     if (new_uri.is_azure())
1186 #ifdef HAVE_AZURE
1187       return azure_.move_dir(old_uri, new_uri);
1188 #else
1189       return LOG_STATUS(
1190           Status::VFSError("TileDB was built without Azure support"));
1191 #endif
1192     return LOG_STATUS(Status::VFSError(
1193         "Moving files across filesystems is not supported yet"));
1194   }
1195 
1196   // GCS
1197   if (old_uri.is_gcs()) {
1198     if (new_uri.is_gcs())
1199 #ifdef HAVE_GCS
1200       return gcs_.move_dir(old_uri, new_uri);
1201 #else
1202       return LOG_STATUS(
1203           Status::VFSError("TileDB was built without GCS support"));
1204 #endif
1205     return LOG_STATUS(Status::VFSError(
1206         "Moving files across filesystems is not supported yet"));
1207   }
1208 
1209   // In-memory filesystem
1210   if (old_uri.is_memfs()) {
1211     if (new_uri.is_memfs()) {
1212       return memfs_.move(old_uri.to_path(), new_uri.to_path());
1213     }
1214     return LOG_STATUS(Status::VFSError(
1215         "Moving files across filesystems is not supported yet"));
1216   }
1217 
1218   // Unsupported filesystem
1219   return LOG_STATUS(Status::VFSError(
1220       "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1221       new_uri.to_string()));
1222 }
1223 
copy_file(const URI & old_uri,const URI & new_uri)1224 Status VFS::copy_file(const URI& old_uri, const URI& new_uri) {
1225   if (!init_)
1226     return LOG_STATUS(
1227         Status::VFSError("Cannot copy file; VFS not initialized"));
1228 
1229   // If new_uri exists, delete it or raise an error based on `force`
1230   bool is_file;
1231   RETURN_NOT_OK(this->is_file(new_uri, &is_file));
1232   if (is_file)
1233     RETURN_NOT_OK(remove_file(new_uri));
1234 
1235   // File
1236   if (old_uri.is_file()) {
1237     if (new_uri.is_file()) {
1238 #ifdef _WIN32
1239       return LOG_STATUS(Status::IOError(
1240           std::string("Copying files on Windows is not yet supported.")));
1241 #else
1242       return posix_.copy_file(old_uri.to_path(), new_uri.to_path());
1243 #endif
1244     }
1245     return LOG_STATUS(Status::VFSError(
1246         "Copying files across filesystems is not supported yet"));
1247   }
1248 
1249   // HDFS
1250   if (old_uri.is_hdfs()) {
1251     if (new_uri.is_hdfs())
1252 #ifdef HAVE_HDFS
1253       return LOG_STATUS(Status::IOError(
1254           std::string("Copying files on HDFS is not yet supported.")));
1255 #else
1256       return LOG_STATUS(
1257           Status::VFSError("TileDB was built without HDFS support"));
1258 #endif
1259     return LOG_STATUS(Status::VFSError(
1260         "Copying files across filesystems is not supported yet"));
1261   }
1262 
1263   // S3
1264   if (old_uri.is_s3()) {
1265     if (new_uri.is_s3())
1266 #ifdef HAVE_S3
1267       return s3_.copy_file(old_uri, new_uri);
1268 #else
1269       return LOG_STATUS(
1270           Status::VFSError("TileDB was built without S3 support"));
1271 #endif
1272     return LOG_STATUS(Status::VFSError(
1273         "Copying files across filesystems is not supported yet"));
1274   }
1275 
1276   // Azure
1277   if (old_uri.is_azure()) {
1278     if (new_uri.is_azure())
1279 #ifdef HAVE_AZURE
1280       return LOG_STATUS(Status::IOError(
1281           std::string("Copying files on Azure is not yet supported.")));
1282 #else
1283       return LOG_STATUS(
1284           Status::VFSError("TileDB was built without Azure support"));
1285 #endif
1286     return LOG_STATUS(Status::VFSError(
1287         "Copying files across filesystems is not supported yet"));
1288   }
1289 
1290   // GCS
1291   if (old_uri.is_gcs()) {
1292     if (new_uri.is_gcs())
1293 #ifdef HAVE_GCS
1294       return LOG_STATUS(Status::IOError(
1295           std::string("Copying files on GCS is not yet supported.")));
1296 #else
1297       return LOG_STATUS(
1298           Status::VFSError("TileDB was built without GCS support"));
1299 #endif
1300     return LOG_STATUS(Status::VFSError(
1301         "Copying files across filesystems is not supported yet"));
1302   }
1303 
1304   // Unsupported filesystem
1305   return LOG_STATUS(Status::VFSError(
1306       "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1307       new_uri.to_string()));
1308 }
1309 
copy_dir(const URI & old_uri,const URI & new_uri)1310 Status VFS::copy_dir(const URI& old_uri, const URI& new_uri) {
1311   if (!init_)
1312     return LOG_STATUS(
1313         Status::VFSError("Cannot copy directory; VFS not initialized"));
1314 
1315   // File
1316   if (old_uri.is_file()) {
1317     if (new_uri.is_file()) {
1318 #ifdef _WIN32
1319       return LOG_STATUS(Status::IOError(
1320           std::string("Copying directories on Windows is not yet supported.")));
1321 #else
1322       return posix_.copy_dir(old_uri.to_path(), new_uri.to_path());
1323 #endif
1324     }
1325     return LOG_STATUS(Status::VFSError(
1326         "Copying directories across filesystems is not supported yet"));
1327   }
1328 
1329   // HDFS
1330   if (old_uri.is_hdfs()) {
1331     if (new_uri.is_hdfs())
1332 #ifdef HAVE_HDFS
1333       return LOG_STATUS(Status::IOError(
1334           std::string("Copying directories on HDFS is not yet supported.")));
1335 #else
1336       return LOG_STATUS(
1337           Status::VFSError("TileDB was built without HDFS support"));
1338 #endif
1339     return LOG_STATUS(Status::VFSError(
1340         "Copying directories across filesystems is not supported yet"));
1341   }
1342 
1343   // S3
1344   if (old_uri.is_s3()) {
1345     if (new_uri.is_s3())
1346 #ifdef HAVE_S3
1347       return s3_.copy_dir(old_uri, new_uri);
1348 #else
1349       return LOG_STATUS(
1350           Status::VFSError("TileDB was built without S3 support"));
1351 #endif
1352     return LOG_STATUS(Status::VFSError(
1353         "Copying directories across filesystems is not supported yet"));
1354   }
1355 
1356   // Azure
1357   if (old_uri.is_azure()) {
1358     if (new_uri.is_azure())
1359 #ifdef HAVE_AZURE
1360       return LOG_STATUS(Status::IOError(
1361           std::string("Copying directories on Azure is not yet supported.")));
1362 #else
1363       return LOG_STATUS(
1364           Status::VFSError("TileDB was built without Azure support"));
1365 #endif
1366     return LOG_STATUS(Status::VFSError(
1367         "Copying directories across filesystems is not supported yet"));
1368   }
1369 
1370   // GCS
1371   if (old_uri.is_gcs()) {
1372     if (new_uri.is_gcs())
1373 #ifdef HAVE_GCS
1374       return LOG_STATUS(Status::IOError(
1375           std::string("Copying directories on GCS is not yet supported.")));
1376 #else
1377       return LOG_STATUS(
1378           Status::VFSError("TileDB was built without GCS support"));
1379 #endif
1380     return LOG_STATUS(Status::VFSError(
1381         "Copying directories across filesystems is not supported yet"));
1382   }
1383 
1384   // Unsupported filesystem
1385   return LOG_STATUS(Status::VFSError(
1386       "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1387       new_uri.to_string()));
1388 }
1389 
read(const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,bool use_read_ahead)1390 Status VFS::read(
1391     const URI& uri,
1392     const uint64_t offset,
1393     void* const buffer,
1394     const uint64_t nbytes,
1395     bool use_read_ahead) {
1396   stats_->add_counter("read_byte_num", nbytes);
1397 
1398   if (!init_)
1399     return LOG_STATUS(Status::VFSError("Cannot read; VFS not initialized"));
1400 
1401   // Get config params
1402   bool found;
1403   uint64_t min_parallel_size = 0;
1404   RETURN_NOT_OK(config_.get<uint64_t>(
1405       "vfs.min_parallel_size", &min_parallel_size, &found));
1406   assert(found);
1407   uint64_t max_ops = 0;
1408   RETURN_NOT_OK(max_parallel_ops(uri, &max_ops));
1409 
1410   // Ensure that each thread is responsible for at least min_parallel_size
1411   // bytes, and cap the number of parallel operations at the configured maximum
1412   // number.
1413   uint64_t num_ops =
1414       std::min(std::max(nbytes / min_parallel_size, uint64_t(1)), max_ops);
1415 
1416   if (num_ops == 1) {
1417     return read_impl(uri, offset, buffer, nbytes, use_read_ahead);
1418   } else {
1419     // we don't want read-ahead when performing random access reads
1420     use_read_ahead = false;
1421     std::vector<ThreadPool::Task> results;
1422     uint64_t thread_read_nbytes = utils::math::ceil(nbytes, num_ops);
1423 
1424     for (uint64_t i = 0; i < num_ops; i++) {
1425       uint64_t begin = i * thread_read_nbytes,
1426                end = std::min((i + 1) * thread_read_nbytes - 1, nbytes - 1);
1427       uint64_t thread_nbytes = end - begin + 1;
1428       uint64_t thread_offset = offset + begin;
1429       auto thread_buffer = reinterpret_cast<char*>(buffer) + begin;
1430       auto task = cancelable_tasks_.execute(
1431           io_tp_,
1432           [this,
1433            uri,
1434            thread_offset,
1435            thread_buffer,
1436            thread_nbytes,
1437            use_read_ahead]() {
1438             return read_impl(
1439                 uri,
1440                 thread_offset,
1441                 thread_buffer,
1442                 thread_nbytes,
1443                 use_read_ahead);
1444           });
1445       results.push_back(std::move(task));
1446     }
1447     Status st = io_tp_->wait_all(results);
1448     if (!st.ok()) {
1449       std::stringstream errmsg;
1450       errmsg << "VFS parallel read error '" << uri.to_string() << "'; "
1451              << st.message();
1452       return LOG_STATUS(Status::VFSError(errmsg.str()));
1453     }
1454     return st;
1455   }
1456 }
1457 
read_impl(const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,const bool use_read_ahead)1458 Status VFS::read_impl(
1459     const URI& uri,
1460     const uint64_t offset,
1461     void* const buffer,
1462     const uint64_t nbytes,
1463     const bool use_read_ahead) {
1464   stats_->add_counter("read_ops_num", 1);
1465 
1466   // We only check to use the read-ahead cache for cloud-storage
1467   // backends. No-op the `use_read_ahead` to prevent the unused
1468   // variable compiler warning.
1469   (void)use_read_ahead;
1470 
1471   if (uri.is_file()) {
1472 #ifdef _WIN32
1473     return win_.read(uri.to_path(), offset, buffer, nbytes);
1474 #else
1475     return posix_.read(uri.to_path(), offset, buffer, nbytes);
1476 #endif
1477   }
1478   if (uri.is_hdfs()) {
1479 #ifdef HAVE_HDFS
1480     return hdfs_->read(uri, offset, buffer, nbytes);
1481 #else
1482     return LOG_STATUS(
1483         Status::VFSError("TileDB was built without HDFS support"));
1484 #endif
1485   }
1486   if (uri.is_s3()) {
1487 #ifdef HAVE_S3
1488     const auto read_fn = std::bind(
1489         &S3::read,
1490         &s3_,
1491         std::placeholders::_1,
1492         std::placeholders::_2,
1493         std::placeholders::_3,
1494         std::placeholders::_4,
1495         std::placeholders::_5,
1496         std::placeholders::_6);
1497     return read_ahead_impl(
1498         read_fn, uri, offset, buffer, nbytes, use_read_ahead);
1499 #else
1500     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1501 #endif
1502   }
1503   if (uri.is_azure()) {
1504 #ifdef HAVE_AZURE
1505     const auto read_fn = std::bind(
1506         &Azure::read,
1507         &azure_,
1508         std::placeholders::_1,
1509         std::placeholders::_2,
1510         std::placeholders::_3,
1511         std::placeholders::_4,
1512         std::placeholders::_5,
1513         std::placeholders::_6);
1514     return read_ahead_impl(
1515         read_fn, uri, offset, buffer, nbytes, use_read_ahead);
1516 #else
1517     return LOG_STATUS(
1518         Status::VFSError("TileDB was built without Azure support"));
1519 #endif
1520   }
1521   if (uri.is_gcs()) {
1522 #ifdef HAVE_GCS
1523     const auto read_fn = std::bind(
1524         &GCS::read,
1525         &gcs_,
1526         std::placeholders::_1,
1527         std::placeholders::_2,
1528         std::placeholders::_3,
1529         std::placeholders::_4,
1530         std::placeholders::_5,
1531         std::placeholders::_6);
1532     return read_ahead_impl(
1533         read_fn, uri, offset, buffer, nbytes, use_read_ahead);
1534 #else
1535     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1536 #endif
1537   }
1538   if (uri.is_memfs()) {
1539     return memfs_.read(uri.to_path(), offset, buffer, nbytes);
1540   }
1541 
1542   return LOG_STATUS(
1543       Status::VFSError("Unsupported URI schemes: " + uri.to_string()));
1544 }
1545 
read_ahead_impl(const std::function<Status (const URI &,off_t,void *,uint64_t,uint64_t,uint64_t *)> & read_fn,const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,const bool use_read_ahead)1546 Status VFS::read_ahead_impl(
1547     const std::function<Status(
1548         const URI&, off_t, void*, uint64_t, uint64_t, uint64_t*)>& read_fn,
1549     const URI& uri,
1550     const uint64_t offset,
1551     void* const buffer,
1552     const uint64_t nbytes,
1553     const bool use_read_ahead) {
1554   // Stores the total number of bytes read.
1555   uint64_t nbytes_read = 0;
1556 
1557   // Do not use the read-ahead cache if disabled by the caller.
1558   if (!use_read_ahead)
1559     return read_fn(uri, offset, buffer, nbytes, 0, &nbytes_read);
1560 
1561   // Only perform a read-ahead if the requested read size
1562   // is smaller than the size of the buffers in the read-ahead
1563   // cache. This is because:
1564   // 1. The read-ahead is primarily beneficial for IO patterns
1565   //    that consist of numerous small reads.
1566   // 2. Large reads may evict cached buffers that would be useful
1567   //    to a future small read.
1568   // 3. It saves us a copy. We must make a copy of the buffer at
1569   //    some point (one for the user, one for the cache).
1570   if (nbytes >= read_ahead_size_)
1571     return read_fn(uri, offset, buffer, nbytes, 0, &nbytes_read);
1572 
1573   // Avoid a read if the requested buffer can be read from the
1574   // read cache. Note that we intentionally do not use a read
1575   // cache for local files because we rely on the operating
1576   // system's file system to cache readahead data in memory.
1577   // Additionally, we do not perform readahead with HDFS.
1578   bool success;
1579   RETURN_NOT_OK(read_ahead_cache_->read(uri, offset, buffer, nbytes, &success));
1580   if (success)
1581     return Status::Ok();
1582 
1583   // We will read directly into the read-ahead buffer and then copy
1584   // the subrange of this buffer back to the user to satisfy the
1585   // read request.
1586   Buffer ra_buffer;
1587   RETURN_NOT_OK(ra_buffer.realloc(read_ahead_size_));
1588 
1589   // Calculate the exact number of bytes to populate `ra_buffer`
1590   // with `read_ahead_size_` bytes.
1591   const uint64_t ra_nbytes = read_ahead_size_ - nbytes;
1592 
1593   // Read into `ra_buffer`.
1594   RETURN_NOT_OK(
1595       read_fn(uri, offset, ra_buffer.data(), nbytes, ra_nbytes, &nbytes_read));
1596 
1597   // Copy the requested read range back into the caller's output `buffer`.
1598   assert(nbytes_read >= nbytes);
1599   std::memcpy(buffer, ra_buffer.data(), nbytes);
1600 
1601   // Cache `ra_buffer` at `offset`.
1602   ra_buffer.set_size(nbytes_read);
1603   RETURN_NOT_OK(read_ahead_cache_->insert(uri, offset, std::move(ra_buffer)));
1604 
1605   return Status::Ok();
1606 }
1607 
read_all(const URI & uri,const std::vector<std::tuple<uint64_t,void *,uint64_t>> & regions,ThreadPool * thread_pool,std::vector<ThreadPool::Task> * tasks,const bool use_read_ahead)1608 Status VFS::read_all(
1609     const URI& uri,
1610     const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions,
1611     ThreadPool* thread_pool,
1612     std::vector<ThreadPool::Task>* tasks,
1613     const bool use_read_ahead) {
1614   if (!init_)
1615     return LOG_STATUS(Status::VFSError("Cannot read all; VFS not initialized"));
1616 
1617   if (regions.empty())
1618     return Status::Ok();
1619 
1620   // Convert the individual regions into batched regions.
1621   std::vector<BatchedRead> batches;
1622   RETURN_NOT_OK(compute_read_batches(regions, &batches));
1623 
1624   // Read all the batches and copy to the original destinations.
1625   for (const auto& batch : batches) {
1626     URI uri_copy = uri;
1627     BatchedRead batch_copy = batch;
1628     auto task =
1629         thread_pool->execute([this, uri_copy, batch_copy, use_read_ahead]() {
1630           Buffer buffer;
1631           RETURN_NOT_OK(buffer.realloc(batch_copy.nbytes));
1632           RETURN_NOT_OK(read(
1633               uri_copy,
1634               batch_copy.offset,
1635               buffer.data(),
1636               batch_copy.nbytes,
1637               use_read_ahead));
1638           // Parallel copy back into the individual destinations.
1639           for (uint64_t i = 0; i < batch_copy.regions.size(); i++) {
1640             const auto& region = batch_copy.regions[i];
1641             uint64_t offset = std::get<0>(region);
1642             void* dest = std::get<1>(region);
1643             uint64_t nbytes = std::get<2>(region);
1644             std::memcpy(dest, buffer.data(offset - batch_copy.offset), nbytes);
1645           }
1646 
1647           return Status::Ok();
1648         });
1649 
1650     tasks->push_back(std::move(task));
1651   }
1652 
1653   return Status::Ok();
1654 }
1655 
compute_read_batches(const std::vector<std::tuple<uint64_t,void *,uint64_t>> & regions,std::vector<BatchedRead> * batches) const1656 Status VFS::compute_read_batches(
1657     const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions,
1658     std::vector<BatchedRead>* batches) const {
1659   // Get config params
1660   bool found;
1661   uint64_t min_batch_size = 0;
1662   RETURN_NOT_OK(
1663       config_.get<uint64_t>("vfs.min_batch_size", &min_batch_size, &found));
1664   assert(found);
1665   uint64_t min_batch_gap = 0;
1666   RETURN_NOT_OK(
1667       config_.get<uint64_t>("vfs.min_batch_gap", &min_batch_gap, &found));
1668   assert(found);
1669 
1670   // Ensure the regions are sorted on offset.
1671   std::vector<std::tuple<uint64_t, void*, uint64_t>> sorted_regions(
1672       regions.begin(), regions.end());
1673   parallel_sort(
1674       compute_tp_,
1675       sorted_regions.begin(),
1676       sorted_regions.end(),
1677       [](const std::tuple<uint64_t, void*, uint64_t>& a,
1678          const std::tuple<uint64_t, void*, uint64_t>& b) {
1679         return std::get<0>(a) < std::get<0>(b);
1680       });
1681 
1682   // Start the first batch containing only the first region.
1683   BatchedRead curr_batch(sorted_regions.front());
1684   uint64_t curr_batch_useful_bytes = curr_batch.nbytes;
1685   for (uint64_t i = 1; i < sorted_regions.size(); i++) {
1686     const auto& region = sorted_regions[i];
1687     uint64_t offset = std::get<0>(region);
1688     uint64_t nbytes = std::get<2>(region);
1689     uint64_t new_batch_size = (offset + nbytes) - curr_batch.offset;
1690     uint64_t gap = offset - (curr_batch.offset + curr_batch.nbytes);
1691     if (new_batch_size <= min_batch_size || gap <= min_batch_gap) {
1692       // Extend current batch.
1693       curr_batch.nbytes = new_batch_size;
1694       curr_batch.regions.push_back(region);
1695       curr_batch_useful_bytes += nbytes;
1696     } else {
1697       // Push the old batch and start a new one.
1698       batches->push_back(curr_batch);
1699       curr_batch.offset = offset;
1700       curr_batch.nbytes = nbytes;
1701       curr_batch.regions.clear();
1702       curr_batch.regions.push_back(region);
1703       curr_batch_useful_bytes = nbytes;
1704     }
1705   }
1706 
1707   // Push the last batch
1708   batches->push_back(curr_batch);
1709 
1710   return Status::Ok();
1711 }
1712 
supports_fs(Filesystem fs) const1713 bool VFS::supports_fs(Filesystem fs) const {
1714   return (supported_fs_.find(fs) != supported_fs_.end());
1715 }
1716 
supports_uri_scheme(const URI & uri) const1717 bool VFS::supports_uri_scheme(const URI& uri) const {
1718   if (uri.is_s3()) {
1719     return supports_fs(Filesystem::S3);
1720   } else if (uri.is_azure()) {
1721     return supports_fs(Filesystem::AZURE);
1722   } else if (uri.is_gcs()) {
1723     return supports_fs(Filesystem::GCS);
1724   } else if (uri.is_hdfs()) {
1725     return supports_fs(Filesystem::HDFS);
1726   } else {
1727     return true;
1728   }
1729 }
1730 
sync(const URI & uri)1731 Status VFS::sync(const URI& uri) {
1732   if (!init_)
1733     return LOG_STATUS(Status::VFSError("Cannot sync; VFS not initialized"));
1734 
1735   if (uri.is_file()) {
1736 #ifdef _WIN32
1737     return win_.sync(uri.to_path());
1738 #else
1739     return posix_.sync(uri.to_path());
1740 #endif
1741   }
1742   if (uri.is_hdfs()) {
1743 #ifdef HAVE_HDFS
1744     return hdfs_->sync(uri);
1745 #else
1746     return LOG_STATUS(
1747         Status::VFSError("TileDB was built without HDFS support"));
1748 #endif
1749   }
1750   if (uri.is_s3()) {
1751 #ifdef HAVE_S3
1752     return Status::Ok();
1753 #else
1754     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1755 #endif
1756   }
1757   if (uri.is_azure()) {
1758 #ifdef HAVE_AZURE
1759     return Status::Ok();
1760 #else
1761     return LOG_STATUS(
1762         Status::VFSError("TileDB was built without Azure support"));
1763 #endif
1764   }
1765   if (uri.is_gcs()) {
1766 #ifdef HAVE_GCS
1767     return Status::Ok();
1768 #else
1769     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1770 #endif
1771   }
1772   if (uri.is_memfs()) {
1773     return Status::Ok();
1774   }
1775   return LOG_STATUS(
1776       Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
1777 }
1778 
open_file(const URI & uri,VFSMode mode)1779 Status VFS::open_file(const URI& uri, VFSMode mode) {
1780   if (!init_)
1781     return LOG_STATUS(
1782         Status::VFSError("Cannot open file; VFS not initialized"));
1783 
1784   bool is_file;
1785   RETURN_NOT_OK(this->is_file(uri, &is_file));
1786 
1787   switch (mode) {
1788     case VFSMode::VFS_READ:
1789       if (!is_file)
1790         return LOG_STATUS(Status::VFSError(
1791             std::string("Cannot open file '") + uri.c_str() +
1792             "'; File does not exist"));
1793       break;
1794     case VFSMode::VFS_WRITE:
1795       if (is_file)
1796         RETURN_NOT_OK(remove_file(uri));
1797       break;
1798     case VFSMode::VFS_APPEND:
1799       if (uri.is_s3()) {
1800 #ifdef HAVE_S3
1801         return LOG_STATUS(Status::VFSError(
1802             std::string("Cannot open file '") + uri.c_str() +
1803             "'; S3 does not support append mode"));
1804 #else
1805         return LOG_STATUS(Status::VFSError(
1806             "Cannot open file; TileDB was built without S3 support"));
1807 #endif
1808       }
1809       if (uri.is_azure()) {
1810 #ifdef HAVE_AZURE
1811         return LOG_STATUS(Status::VFSError(
1812             std::string("Cannot open file '") + uri.c_str() +
1813             "'; Azure does not support append mode"));
1814 #else
1815         return LOG_STATUS(Status::VFSError(
1816             "Cannot open file; TileDB was built without Azure support"));
1817 #endif
1818       }
1819       if (uri.is_gcs()) {
1820 #ifdef HAVE_GCS
1821         return LOG_STATUS(Status::VFSError(
1822             std::string("Cannot open file '") + uri.c_str() +
1823             "'; GCS does not support append mode"));
1824 #else
1825         return LOG_STATUS(Status::VFSError(
1826             "Cannot open file; TileDB was built without GCS support"));
1827 #endif
1828       }
1829       break;
1830   }
1831 
1832   return Status::Ok();
1833 }
1834 
close_file(const URI & uri)1835 Status VFS::close_file(const URI& uri) {
1836   if (!init_)
1837     return LOG_STATUS(
1838         Status::VFSError("Cannot close file; VFS not initialized"));
1839 
1840   if (uri.is_file()) {
1841 #ifdef _WIN32
1842     return win_.sync(uri.to_path());
1843 #else
1844     return posix_.sync(uri.to_path());
1845 #endif
1846   }
1847   if (uri.is_hdfs()) {
1848 #ifdef HAVE_HDFS
1849     return hdfs_->sync(uri);
1850 #else
1851     return LOG_STATUS(
1852         Status::VFSError("TileDB was built without HDFS support"));
1853 #endif
1854   }
1855   if (uri.is_s3()) {
1856 #ifdef HAVE_S3
1857     return s3_.flush_object(uri);
1858 #else
1859     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1860 #endif
1861   }
1862   if (uri.is_azure()) {
1863 #ifdef HAVE_AZURE
1864     return azure_.flush_blob(uri);
1865 #else
1866     return LOG_STATUS(
1867         Status::VFSError("TileDB was built without Azure support"));
1868 #endif
1869   }
1870   if (uri.is_gcs()) {
1871 #ifdef HAVE_GCS
1872     return gcs_.flush_object(uri);
1873 #else
1874     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1875 #endif
1876   }
1877   if (uri.is_memfs()) {
1878     return Status::Ok();
1879   }
1880   return LOG_STATUS(
1881       Status::VFSError("Unsupported URI schemes: " + uri.to_string()));
1882 }
1883 
write(const URI & uri,const void * buffer,uint64_t buffer_size)1884 Status VFS::write(const URI& uri, const void* buffer, uint64_t buffer_size) {
1885   stats_->add_counter("write_byte_num", buffer_size);
1886   stats_->add_counter("write_ops_num", 1);
1887 
1888   if (!init_)
1889     return LOG_STATUS(Status::VFSError("Cannot write; VFS not initialized"));
1890 
1891   if (uri.is_file()) {
1892 #ifdef _WIN32
1893     return win_.write(uri.to_path(), buffer, buffer_size);
1894 #else
1895     return posix_.write(uri.to_path(), buffer, buffer_size);
1896 #endif
1897   }
1898   if (uri.is_hdfs()) {
1899 #ifdef HAVE_HDFS
1900     return hdfs_->write(uri, buffer, buffer_size);
1901 #else
1902     return LOG_STATUS(
1903         Status::VFSError("TileDB was built without HDFS support"));
1904 #endif
1905   }
1906   if (uri.is_s3()) {
1907 #ifdef HAVE_S3
1908     return s3_.write(uri, buffer, buffer_size);
1909 #else
1910     return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1911 #endif
1912   }
1913   if (uri.is_azure()) {
1914 #ifdef HAVE_AZURE
1915     return azure_.write(uri, buffer, buffer_size);
1916 #else
1917     return LOG_STATUS(
1918         Status::VFSError("TileDB was built without Azure support"));
1919 #endif
1920   }
1921   if (uri.is_gcs()) {
1922 #ifdef HAVE_GCS
1923     return gcs_.write(uri, buffer, buffer_size);
1924 #else
1925     return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1926 #endif
1927   }
1928   if (uri.is_memfs()) {
1929     return memfs_.write(uri.to_path(), buffer, buffer_size);
1930   }
1931   return LOG_STATUS(
1932       Status::VFSError("Unsupported URI schemes: " + uri.to_string()));
1933 }
1934 
1935 }  // namespace sm
1936 }  // namespace tiledb
1937