1 /**
2 * @file vfs.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file implements the VFS class.
31 */
32
33 #include "tiledb/sm/filesystem/vfs.h"
34 #include "tiledb/common/heap_memory.h"
35 #include "tiledb/common/logger.h"
36 #include "tiledb/sm/buffer/buffer.h"
37 #include "tiledb/sm/enums/filesystem.h"
38 #include "tiledb/sm/enums/vfs_mode.h"
39 #include "tiledb/sm/filesystem/hdfs_filesystem.h"
40 #include "tiledb/sm/misc/parallel_functions.h"
41 #include "tiledb/sm/misc/utils.h"
42 #include "tiledb/sm/stats/global_stats.h"
43
44 #include <iostream>
45 #include <list>
46 #include <sstream>
47 #include <unordered_map>
48
49 using namespace tiledb::common;
50
51 namespace tiledb {
52 namespace sm {
53
54 /* ********************************* */
55 /* GLOBAL VARIABLES */
56 /* ********************************* */
57
58 /**
59 * Map of file URI -> number of current locks. This is shared across the entire
60 * process.
61 */
62 static std::unordered_map<std::string, std::pair<uint64_t, filelock_t>>
63 process_filelocks_;
64
65 /** Mutex protecting the filelock process and the filelock counts map. */
66 static std::mutex filelock_mtx_;
67
68 /* ********************************* */
69 /* CONSTRUCTORS & DESTRUCTORS */
70 /* ********************************* */
71
VFS()72 VFS::VFS()
73 : stats_(nullptr)
74 , init_(false)
75 , read_ahead_size_(0)
76 , compute_tp_(nullptr)
77 , io_tp_(nullptr) {
78 #ifdef HAVE_AZURE
79 supported_fs_.insert(Filesystem::AZURE);
80 #endif
81 #ifdef HAVE_GCS
82 supported_fs_.insert(Filesystem::GCS);
83 #endif
84 #ifdef HAVE_HDFS
85 supported_fs_.insert(Filesystem::HDFS);
86 #endif
87 #ifdef HAVE_S3
88 supported_fs_.insert(Filesystem::S3);
89 #endif
90 supported_fs_.insert(Filesystem::MEMFS);
91 }
92
93 /* ********************************* */
94 /* API */
95 /* ********************************* */
96
abs_path(const std::string & path)97 std::string VFS::abs_path(const std::string& path) {
98 // workaround for older clang (llvm 3.5) compilers (issue #828)
99 std::string path_copy = path;
100 #ifdef _WIN32
101 if (Win::is_win_path(path))
102 return Win::uri_from_path(Win::abs_path(path));
103 else if (URI::is_file(path))
104 return Win::uri_from_path(Win::abs_path(Win::path_from_uri(path)));
105 #else
106 if (URI::is_file(path))
107 return Posix::abs_path(path);
108 #endif
109 if (URI::is_hdfs(path))
110 return path_copy;
111 if (URI::is_s3(path))
112 return path_copy;
113 if (URI::is_azure(path))
114 return path_copy;
115 if (URI::is_gcs(path))
116 return path_copy;
117 if (URI::is_memfs(path))
118 return path_copy;
119 // Certainly starts with "<resource>://" other than "file://"
120 return path_copy;
121 }
122
config() const123 Config VFS::config() const {
124 return config_;
125 }
126
create_dir(const URI & uri) const127 Status VFS::create_dir(const URI& uri) const {
128 if (!init_)
129 return LOG_STATUS(
130 Status::VFSError("Cannot create directory; VFS not initialized"));
131
132 if (!uri.is_s3() && !uri.is_azure() && !uri.is_gcs()) {
133 bool is_dir;
134 RETURN_NOT_OK(this->is_dir(uri, &is_dir));
135 if (is_dir)
136 return Status::Ok();
137 }
138
139 if (uri.is_file()) {
140 #ifdef _WIN32
141 return win_.create_dir(uri.to_path());
142 #else
143 return posix_.create_dir(uri.to_path());
144 #endif
145 }
146 if (uri.is_hdfs()) {
147 #ifdef HAVE_HDFS
148 return hdfs_->create_dir(uri);
149 #else
150 return LOG_STATUS(
151 Status::VFSError("TileDB was built without HDFS support"));
152 #endif
153 }
154 if (uri.is_s3()) {
155 #ifdef HAVE_S3
156 // It is a noop for S3
157 return Status::Ok();
158 #else
159 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
160 #endif
161 }
162 if (uri.is_azure()) {
163 #ifdef HAVE_AZURE
164 // It is a noop for Azure
165 return Status::Ok();
166 #else
167 return LOG_STATUS(
168 Status::VFSError("TileDB was built without Azure support"));
169 #endif
170 }
171 if (uri.is_gcs()) {
172 #ifdef HAVE_GCS
173 // It is a noop for GCS
174 return Status::Ok();
175 #else
176 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
177 #endif
178 }
179 if (uri.is_memfs()) {
180 return memfs_.create_dir(uri.to_path());
181 }
182 return LOG_STATUS(Status::VFSError(
183 std::string("Unsupported URI scheme: ") + uri.to_string()));
184 }
185
dir_size(const URI & dir_name,uint64_t * dir_size) const186 Status VFS::dir_size(const URI& dir_name, uint64_t* dir_size) const {
187 if (!init_)
188 return LOG_STATUS(
189 Status::VFSError("Cannot get directory size; VFS not initialized"));
190
191 // Sanity check
192 bool is_dir;
193 RETURN_NOT_OK(this->is_dir(dir_name, &is_dir));
194 if (!is_dir)
195 return LOG_STATUS(Status::VFSError(
196 std::string("Cannot get directory size; Input '") +
197 dir_name.to_string() + "' is not a directory"));
198
199 // Get all files in the tree rooted at `dir_name` and add their sizes
200 *dir_size = 0;
201 uint64_t size;
202 std::list<URI> to_ls;
203 bool is_file;
204 to_ls.push_front(dir_name);
205 do {
206 auto uri = to_ls.front();
207 to_ls.pop_front();
208 std::vector<URI> children;
209 RETURN_NOT_OK(ls(uri, &children));
210 for (const auto& child : children) {
211 RETURN_NOT_OK(this->is_file(child, &is_file));
212 if (is_file) {
213 RETURN_NOT_OK(file_size(child, &size));
214 *dir_size += size;
215 } else {
216 to_ls.push_back(child);
217 }
218 }
219 } while (!to_ls.empty());
220
221 return Status::Ok();
222 }
223
touch(const URI & uri) const224 Status VFS::touch(const URI& uri) const {
225 if (!init_)
226 return LOG_STATUS(
227 Status::VFSError("Cannot touch file; VFS not initialized"));
228
229 if (uri.is_file()) {
230 #ifdef _WIN32
231 return win_.touch(uri.to_path());
232 #else
233 return posix_.touch(uri.to_path());
234 #endif
235 }
236 if (uri.is_hdfs()) {
237 #ifdef HAVE_HDFS
238 return hdfs_->touch(uri);
239 #else
240 return LOG_STATUS(
241 Status::VFSError("TileDB was built without HDFS support"));
242 #endif
243 }
244 if (uri.is_s3()) {
245 #ifdef HAVE_S3
246 return s3_.touch(uri);
247 #else
248 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
249 #endif
250 }
251 if (uri.is_azure()) {
252 #ifdef HAVE_AZURE
253 return azure_.touch(uri);
254 #else
255 return LOG_STATUS(
256 Status::VFSError("TileDB was built without Azure support"));
257 #endif
258 }
259 if (uri.is_gcs()) {
260 #ifdef HAVE_GCS
261 return gcs_.touch(uri);
262 #else
263 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
264 #endif
265 }
266 if (uri.is_memfs()) {
267 return memfs_.touch(uri.to_path());
268 }
269 return LOG_STATUS(Status::VFSError(
270 std::string("Unsupported URI scheme: ") + uri.to_string()));
271 }
272
cancel_all_tasks()273 Status VFS::cancel_all_tasks() {
274 if (!init_)
275 return LOG_STATUS(
276 Status::VFSError("Cannot cancel all tasks; VFS not initialized"));
277
278 cancelable_tasks_.cancel_all_tasks();
279 return Status::Ok();
280 }
281
create_bucket(const URI & uri) const282 Status VFS::create_bucket(const URI& uri) const {
283 if (!init_)
284 return LOG_STATUS(
285 Status::VFSError("Cannot create bucket; VFS not initialized"));
286
287 if (uri.is_s3()) {
288 #ifdef HAVE_S3
289 return s3_.create_bucket(uri);
290 #else
291 (void)uri;
292 return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
293 #endif
294 }
295 if (uri.is_azure()) {
296 #ifdef HAVE_AZURE
297 return azure_.create_container(uri);
298 #else
299 (void)uri;
300 return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
301 #endif
302 }
303 if (uri.is_gcs()) {
304 #ifdef HAVE_GCS
305 return gcs_.create_bucket(uri);
306 #else
307 (void)uri;
308 return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
309 #endif
310 }
311 return LOG_STATUS(Status::VFSError(
312 std::string("Cannot create bucket; Unsupported URI scheme: ") +
313 uri.to_string()));
314 }
315
remove_bucket(const URI & uri) const316 Status VFS::remove_bucket(const URI& uri) const {
317 if (!init_)
318 return LOG_STATUS(
319 Status::VFSError("Cannot remove bucket; VFS not initialized"));
320
321 if (uri.is_s3()) {
322 #ifdef HAVE_S3
323 return s3_.remove_bucket(uri);
324 #else
325 (void)uri;
326 return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
327 #endif
328 }
329 if (uri.is_azure()) {
330 #ifdef HAVE_AZURE
331 return azure_.remove_container(uri);
332 #else
333 (void)uri;
334 return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
335 #endif
336 }
337 if (uri.is_gcs()) {
338 #ifdef HAVE_GCS
339 return gcs_.remove_bucket(uri);
340 #else
341 (void)uri;
342 return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
343 #endif
344 }
345 return LOG_STATUS(Status::VFSError(
346 std::string("Cannot remove bucket; Unsupported URI scheme: ") +
347 uri.to_string()));
348 }
349
empty_bucket(const URI & uri) const350 Status VFS::empty_bucket(const URI& uri) const {
351 if (!init_)
352 return LOG_STATUS(
353 Status::VFSError("Cannot empty bucket; VFS not "
354 "initialized"));
355
356 if (uri.is_s3()) {
357 #ifdef HAVE_S3
358 return s3_.empty_bucket(uri);
359 #else
360 (void)uri;
361 return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
362 #endif
363 }
364 if (uri.is_azure()) {
365 #ifdef HAVE_AZURE
366 return azure_.empty_container(uri);
367 #else
368 (void)uri;
369 return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
370 #endif
371 }
372 if (uri.is_gcs()) {
373 #ifdef HAVE_GCS
374 return gcs_.empty_bucket(uri);
375 #else
376 (void)uri;
377 return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
378 #endif
379 }
380 return LOG_STATUS(Status::VFSError(
381 std::string("Cannot empty bucket; Unsupported URI scheme: ") +
382 uri.to_string()));
383 }
384
is_empty_bucket(const URI & uri,bool * is_empty) const385 Status VFS::is_empty_bucket(const URI& uri, bool* is_empty) const {
386 if (!init_)
387 return LOG_STATUS(
388 Status::VFSError("Cannot check if bucket is empty; "
389 "VFS not initialized"));
390
391 if (uri.is_s3()) {
392 #ifdef HAVE_S3
393 return s3_.is_empty_bucket(uri, is_empty);
394 #else
395 (void)uri;
396 (void)is_empty;
397 return LOG_STATUS(Status::VFSError(std::string("S3 is not supported")));
398 #endif
399 }
400 if (uri.is_azure()) {
401 #ifdef HAVE_AZURE
402 return azure_.is_empty_container(uri, is_empty);
403 #else
404 (void)uri;
405 (void)is_empty;
406 return LOG_STATUS(Status::VFSError(std::string("Azure is not supported")));
407 #endif
408 }
409 if (uri.is_gcs()) {
410 #ifdef HAVE_GCS
411 return gcs_.is_empty_bucket(uri, is_empty);
412 #else
413 (void)uri;
414 (void)is_empty;
415 return LOG_STATUS(Status::VFSError(std::string("GCS is not supported")));
416 #endif
417 }
418 return LOG_STATUS(Status::VFSError(
419 std::string("Cannot remove bucket; Unsupported URI scheme: ") +
420 uri.to_string()));
421 }
422
remove_dir(const URI & uri) const423 Status VFS::remove_dir(const URI& uri) const {
424 if (!init_)
425 return LOG_STATUS(
426 Status::VFSError("Cannot remove directory; VFS not "
427 "initialized"));
428
429 if (uri.is_file()) {
430 #ifdef _WIN32
431 return win_.remove_dir(uri.to_path());
432 #else
433 return posix_.remove_dir(uri.to_path());
434 #endif
435 } else if (uri.is_hdfs()) {
436 #ifdef HAVE_HDFS
437 return hdfs_->remove_dir(uri);
438 #else
439 return LOG_STATUS(
440 Status::VFSError("TileDB was built without HDFS support"));
441 #endif
442 } else if (uri.is_s3()) {
443 #ifdef HAVE_S3
444 return s3_.remove_dir(uri);
445 #else
446 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
447 #endif
448 } else if (uri.is_azure()) {
449 #ifdef HAVE_AZURE
450 return azure_.remove_dir(uri);
451 #else
452 return LOG_STATUS(
453 Status::VFSError("TileDB was built without Azure support"));
454 #endif
455 } else if (uri.is_gcs()) {
456 #ifdef HAVE_GCS
457 return gcs_.remove_dir(uri);
458 #else
459 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
460 #endif
461 } else if (uri.is_memfs()) {
462 return memfs_.remove(uri.to_path(), true);
463 } else {
464 return LOG_STATUS(
465 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
466 }
467 }
468
remove_file(const URI & uri) const469 Status VFS::remove_file(const URI& uri) const {
470 if (!init_)
471 return LOG_STATUS(
472 Status::VFSError("Cannot remove file; VFS not initialized"));
473
474 if (uri.is_file()) {
475 #ifdef _WIN32
476 return win_.remove_file(uri.to_path());
477 #else
478 return posix_.remove_file(uri.to_path());
479 #endif
480 }
481 if (uri.is_hdfs()) {
482 #ifdef HAVE_HDFS
483 return hdfs_->remove_file(uri);
484 #else
485 return LOG_STATUS(
486 Status::VFSError("TileDB was built without HDFS support"));
487 #endif
488 }
489 if (uri.is_s3()) {
490 #ifdef HAVE_S3
491 return s3_.remove_object(uri);
492 #else
493 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
494 #endif
495 }
496 if (uri.is_azure()) {
497 #ifdef HAVE_AZURE
498 return azure_.remove_blob(uri);
499 #else
500 return LOG_STATUS(
501 Status::VFSError("TileDB was built without Azure support"));
502 #endif
503 }
504 if (uri.is_gcs()) {
505 #ifdef HAVE_GCS
506 return gcs_.remove_object(uri);
507 #else
508 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
509 #endif
510 }
511 if (uri.is_memfs()) {
512 return memfs_.remove(uri.to_path(), false);
513 }
514 return LOG_STATUS(
515 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
516 }
517
filelock_lock(const URI & uri,filelock_t * lock,bool shared) const518 Status VFS::filelock_lock(const URI& uri, filelock_t* lock, bool shared) const {
519 if (!init_)
520 return LOG_STATUS(
521 Status::VFSError("Cannot lock filelock; VFS not initialized"));
522
523 // Get config
524 bool found = false;
525 bool enable_filelocks = false;
526 RETURN_NOT_OK(config_.get<bool>(
527 "vfs.file.enable_filelocks", &enable_filelocks, &found));
528 assert(found);
529
530 if (!enable_filelocks)
531 return Status::Ok();
532
533 // Hold the lock while updating counts and performing the lock.
534 std::unique_lock<std::mutex> lck(filelock_mtx_);
535
536 auto it = process_filelocks_.find(uri.to_string());
537 if (it != process_filelocks_.end()) {
538 it->second.first++;
539 // we need to return the lock for the xlock semantics
540 *lock = it->second.second;
541 return Status::Ok();
542 }
543
544 // We must hold the fd in the global map in order to free from any context
545 if (uri.is_file()) {
546 #ifdef _WIN32
547 auto st = win_.filelock_lock(uri.to_path(), lock, shared);
548 #else
549 auto st = posix_.filelock_lock(uri.to_path(), lock, shared);
550 #endif
551
552 if (st.ok())
553 process_filelocks_[uri.to_string()] = {1, *lock};
554 return st;
555 }
556
557 if (uri.is_memfs()) {
558 return Status::Ok();
559 }
560 if (uri.is_hdfs()) {
561 #ifdef HAVE_HDFS
562 return Status::Ok();
563 #else
564 return LOG_STATUS(
565 Status::VFSError("TileDB was built without HDFS support"));
566 #endif
567 }
568 if (uri.is_s3()) {
569 #ifdef HAVE_S3
570 return Status::Ok();
571 #else
572 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
573 #endif
574 }
575 if (uri.is_azure()) {
576 #ifdef HAVE_AZURE
577 return Status::Ok();
578 #else
579 return LOG_STATUS(
580 Status::VFSError("TileDB was built without Azure support"));
581 #endif
582 }
583 if (uri.is_gcs()) {
584 #ifdef HAVE_GCS
585 return Status::Ok();
586 #else
587 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
588 #endif
589 }
590 return LOG_STATUS(
591 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
592 }
593
filelock_unlock(const URI & uri) const594 Status VFS::filelock_unlock(const URI& uri) const {
595 if (!init_)
596 return LOG_STATUS(
597 Status::VFSError("Cannot unlock filelock; VFS not initialized"));
598
599 // Get config
600 bool found = false;
601 bool enable_filelocks = false;
602 RETURN_NOT_OK(config_.get<bool>(
603 "vfs.file.enable_filelocks", &enable_filelocks, &found));
604 assert(found);
605
606 if (!enable_filelocks)
607 return Status::Ok();
608
609 // Hold the lock while updating counts and performing the unlock.
610 std::unique_lock<std::mutex> lck(filelock_mtx_);
611
612 // Decrement the lock counter and return if the counter is still > 0.
613 bool should_unlock = false;
614 filelock_t fd = INVALID_FILELOCK;
615 Status st = decr_lock_count(uri, &should_unlock, &fd);
616 if (!st.ok() || !should_unlock) {
617 return st;
618 }
619
620 if (uri.is_file()) {
621 #ifdef _WIN32
622 return win_.filelock_unlock(fd);
623 #else
624 return posix_.filelock_unlock(fd);
625 #endif
626 }
627 if (uri.is_hdfs()) {
628 #ifdef HAVE_HDFS
629 return Status::Ok();
630 #else
631 return LOG_STATUS(
632 Status::VFSError("TileDB was built without HDFS support"));
633 #endif
634 }
635 if (uri.is_s3()) {
636 #ifdef HAVE_S3
637 return Status::Ok();
638 #else
639 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
640 #endif
641 }
642 if (uri.is_azure()) {
643 #ifdef HAVE_AZURE
644 return Status::Ok();
645 #else
646 return LOG_STATUS(
647 Status::VFSError("TileDB was built without Azure support"));
648 #endif
649 }
650 if (uri.is_gcs()) {
651 #ifdef HAVE_GCS
652 return Status::Ok();
653 #else
654 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
655 #endif
656 }
657 return LOG_STATUS(
658 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
659 }
660
decr_lock_count(const URI & uri,bool * is_zero,filelock_t * lock) const661 Status VFS::decr_lock_count(
662 const URI& uri, bool* is_zero, filelock_t* lock) const {
663 auto it = process_filelocks_.find(uri.to_string());
664 if (it == process_filelocks_.end()) {
665 return LOG_STATUS(
666 Status::VFSError("No lock counter for URI " + uri.to_string()));
667 } else if (it->second.first == 0) {
668 return LOG_STATUS(
669 Status::VFSError("Invalid lock count for URI " + uri.to_string()));
670 }
671
672 it->second.first--;
673
674 if (it->second.first == 0) {
675 *is_zero = true;
676 *lock = it->second.second;
677 process_filelocks_.erase(it);
678 } else {
679 *is_zero = false;
680 }
681 return Status::Ok();
682 }
683
max_parallel_ops(const URI & uri,uint64_t * ops) const684 Status VFS::max_parallel_ops(const URI& uri, uint64_t* ops) const {
685 if (!init_)
686 return LOG_STATUS(
687 Status::VFSError("Cannot get max parallel ops; VFS not initialized"));
688
689 bool found;
690 *ops = 0;
691
692 if (uri.is_file()) {
693 RETURN_NOT_OK(
694 config_.get<uint64_t>("vfs.file.max_parallel_ops", ops, &found));
695 assert(found);
696 } else if (uri.is_hdfs()) {
697 // HDFS backend is currently serial.
698 *ops = 1;
699 } else if (uri.is_s3()) {
700 RETURN_NOT_OK(
701 config_.get<uint64_t>("vfs.s3.max_parallel_ops", ops, &found));
702 assert(found);
703 } else if (uri.is_azure()) {
704 RETURN_NOT_OK(
705 config_.get<uint64_t>("vfs.azure.max_parallel_ops", ops, &found));
706 assert(found);
707 } else if (uri.is_gcs()) {
708 RETURN_NOT_OK(
709 config_.get<uint64_t>("vfs.gcs.max_parallel_ops", ops, &found));
710 assert(found);
711 } else {
712 *ops = 1;
713 }
714
715 return Status::Ok();
716 }
717
file_size(const URI & uri,uint64_t * size) const718 Status VFS::file_size(const URI& uri, uint64_t* size) const {
719 if (!init_)
720 return LOG_STATUS(
721 Status::VFSError("Cannot get file size; VFS not initialized"));
722
723 if (uri.is_file()) {
724 #ifdef _WIN32
725 return win_.file_size(uri.to_path(), size);
726 #else
727 return posix_.file_size(uri.to_path(), size);
728 #endif
729 }
730 if (uri.is_hdfs()) {
731 #ifdef HAVE_HDFS
732 return hdfs_->file_size(uri, size);
733 #else
734 return LOG_STATUS(
735 Status::VFSError("TileDB was built without HDFS support"));
736 #endif
737 }
738 if (uri.is_s3()) {
739 #ifdef HAVE_S3
740 return s3_.object_size(uri, size);
741 #else
742 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
743 #endif
744 }
745 if (uri.is_azure()) {
746 #ifdef HAVE_AZURE
747 return azure_.blob_size(uri, size);
748 #else
749 return LOG_STATUS(
750 Status::VFSError("TileDB was built without Azure support"));
751 #endif
752 }
753 if (uri.is_gcs()) {
754 #ifdef HAVE_GCS
755 return gcs_.object_size(uri, size);
756 #else
757 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
758 #endif
759 }
760 if (uri.is_memfs()) {
761 return memfs_.file_size(uri.to_path(), size);
762 }
763 return LOG_STATUS(
764 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
765 }
766
is_dir(const URI & uri,bool * is_dir) const767 Status VFS::is_dir(const URI& uri, bool* is_dir) const {
768 if (!init_)
769 return LOG_STATUS(
770 Status::VFSError("Cannot check directory; VFS not initialized"));
771
772 if (uri.is_file()) {
773 #ifdef _WIN32
774 *is_dir = win_.is_dir(uri.to_path());
775 #else
776 *is_dir = posix_.is_dir(uri.to_path());
777 #endif
778 return Status::Ok();
779 }
780 if (uri.is_hdfs()) {
781 #ifdef HAVE_HDFS
782 return hdfs_->is_dir(uri, is_dir);
783 #else
784 *is_dir = false;
785 return LOG_STATUS(
786 Status::VFSError("TileDB was built without HDFS support"));
787 #endif
788 }
789 if (uri.is_s3()) {
790 #ifdef HAVE_S3
791 return s3_.is_dir(uri, is_dir);
792 #else
793 *is_dir = false;
794 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
795 #endif
796 }
797 if (uri.is_azure()) {
798 #ifdef HAVE_AZURE
799 return azure_.is_dir(uri, is_dir);
800 #else
801 *is_dir = false;
802 return LOG_STATUS(
803 Status::VFSError("TileDB was built without Azure support"));
804 #endif
805 }
806 if (uri.is_gcs()) {
807 #ifdef HAVE_GCS
808 return gcs_.is_dir(uri, is_dir);
809 #else
810 *is_dir = false;
811 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
812 #endif
813 }
814 if (uri.is_memfs()) {
815 *is_dir = memfs_.is_dir(uri.to_path());
816 return Status::Ok();
817 }
818 return LOG_STATUS(
819 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
820 }
821
is_file(const URI & uri,bool * is_file) const822 Status VFS::is_file(const URI& uri, bool* is_file) const {
823 if (!init_)
824 return LOG_STATUS(
825 Status::VFSError("Cannot check file; VFS not initialized"));
826
827 if (uri.is_file()) {
828 #ifdef _WIN32
829 *is_file = win_.is_file(uri.to_path());
830 #else
831 *is_file = posix_.is_file(uri.to_path());
832 #endif
833 return Status::Ok();
834 }
835 if (uri.is_hdfs()) {
836 #ifdef HAVE_HDFS
837 return hdfs_->is_file(uri, is_file);
838 #else
839 *is_file = false;
840 return LOG_STATUS(
841 Status::VFSError("TileDB was built without HDFS support"));
842 #endif
843 }
844 if (uri.is_s3()) {
845 #ifdef HAVE_S3
846 RETURN_NOT_OK(s3_.is_object(uri, is_file));
847 return Status::Ok();
848 #else
849 *is_file = false;
850 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
851 #endif
852 }
853 if (uri.is_azure()) {
854 #ifdef HAVE_AZURE
855 return azure_.is_blob(uri, is_file);
856 #else
857 *is_file = false;
858 return LOG_STATUS(
859 Status::VFSError("TileDB was built without Azure support"));
860 #endif
861 }
862 if (uri.is_gcs()) {
863 #ifdef HAVE_GCS
864 return gcs_.is_object(uri, is_file);
865 #else
866 *is_file = false;
867 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
868 #endif
869 }
870 if (uri.is_memfs()) {
871 *is_file = memfs_.is_file(uri.to_path());
872 return Status::Ok();
873 }
874 return LOG_STATUS(
875 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
876 }
877
is_bucket(const URI & uri,bool * is_bucket) const878 Status VFS::is_bucket(const URI& uri, bool* is_bucket) const {
879 if (!init_)
880 return LOG_STATUS(
881 Status::VFSError("Cannot check bucket; VFS not initialized"));
882
883 if (uri.is_s3()) {
884 #ifdef HAVE_S3
885 RETURN_NOT_OK(s3_.is_bucket(uri, is_bucket));
886 return Status::Ok();
887 #else
888 *is_bucket = false;
889 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
890 #endif
891 }
892 if (uri.is_azure()) {
893 #ifdef HAVE_AZURE
894 RETURN_NOT_OK(azure_.is_container(uri, is_bucket));
895 return Status::Ok();
896 #else
897 *is_bucket = false;
898 return LOG_STATUS(
899 Status::VFSError("TileDB was built without Azure support"));
900 #endif
901 }
902 if (uri.is_gcs()) {
903 #ifdef HAVE_GCS
904 RETURN_NOT_OK(gcs_.is_bucket(uri, is_bucket));
905 return Status::Ok();
906 #else
907 *is_bucket = false;
908 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
909 #endif
910 }
911
912 return LOG_STATUS(
913 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
914 }
915
init(stats::Stats * const parent_stats,ThreadPool * const compute_tp,ThreadPool * const io_tp,const Config * const ctx_config,const Config * const vfs_config)916 Status VFS::init(
917 stats::Stats* const parent_stats,
918 ThreadPool* const compute_tp,
919 ThreadPool* const io_tp,
920 const Config* const ctx_config,
921 const Config* const vfs_config) {
922 stats_ = parent_stats->create_child("VFS");
923
924 assert(compute_tp);
925 assert(io_tp);
926 compute_tp_ = compute_tp;
927 io_tp_ = io_tp;
928
929 // Set appropriately the config
930 if (ctx_config)
931 config_ = *ctx_config;
932 if (vfs_config)
933 config_.inherit(*vfs_config);
934
935 // Construct the read-ahead cache.
936 bool found = false;
937 uint64_t read_ahead_cache_size = 0;
938 RETURN_NOT_OK(config_.get<uint64_t>(
939 "vfs.read_ahead_cache_size", &read_ahead_cache_size, &found));
940 assert(found);
941 read_ahead_cache_ = tdb_unique_ptr<ReadAheadCache>(
942 tdb_new(ReadAheadCache, read_ahead_cache_size));
943
944 // Store the read-ahead size.
945 RETURN_NOT_OK(
946 config_.get<uint64_t>("vfs.read_ahead_size", &read_ahead_size_, &found));
947 assert(found);
948
949 #ifdef HAVE_HDFS
950 hdfs_ = tdb_unique_ptr<hdfs::HDFS>(tdb_new(hdfs::HDFS));
951 RETURN_NOT_OK(hdfs_->init(config_));
952 #endif
953
954 #ifdef HAVE_S3
955 RETURN_NOT_OK(s3_.init(stats_, config_, io_tp_));
956 #endif
957
958 #ifdef HAVE_AZURE
959 RETURN_NOT_OK(azure_.init(config_, io_tp_));
960 #endif
961
962 #ifdef HAVE_GCS
963 Status st = gcs_.init(config_, io_tp_);
964 if (!st.ok()) {
965 // We should print some warning here, this LOG_STATUS only prints in
966 // verbose mode. Since this is called in the init of the context, we
967 // can't return the error through the normal set it on the context.
968 LOG_STATUS(Status::GCSError(
969 "GCS failed to initialize, GCS support will not be available in this "
970 "context: " +
971 st.message()));
972 }
973 #endif
974
975 #ifdef _WIN32
976 win_.init(config_, io_tp_);
977 #else
978 posix_.init(config_, io_tp_);
979 #endif
980
981 init_ = true;
982
983 return Status::Ok();
984 }
985
terminate()986 Status VFS::terminate() {
987 #ifdef HAVE_S3
988 return s3_.disconnect();
989 #endif
990
991 return Status::Ok();
992 }
993
ls(const URI & parent,std::vector<URI> * uris) const994 Status VFS::ls(const URI& parent, std::vector<URI>* uris) const {
995 if (!init_)
996 return LOG_STATUS(Status::VFSError("Cannot list; VFS not initialized"));
997
998 std::vector<std::string> paths;
999 if (parent.is_file()) {
1000 #ifdef _WIN32
1001 RETURN_NOT_OK(win_.ls(parent.to_path(), &paths));
1002 #else
1003 RETURN_NOT_OK(posix_.ls(parent.to_path(), &paths));
1004 #endif
1005 } else if (parent.is_hdfs()) {
1006 #ifdef HAVE_HDFS
1007 RETURN_NOT_OK(hdfs_->ls(parent, &paths));
1008 #else
1009 return LOG_STATUS(
1010 Status::VFSError("TileDB was built without HDFS support"));
1011 #endif
1012 } else if (parent.is_s3()) {
1013 #ifdef HAVE_S3
1014 RETURN_NOT_OK(s3_.ls(parent, &paths));
1015 #else
1016 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1017 #endif
1018 } else if (parent.is_azure()) {
1019 #ifdef HAVE_AZURE
1020 RETURN_NOT_OK(azure_.ls(parent, &paths));
1021 #else
1022 return LOG_STATUS(
1023 Status::VFSError("TileDB was built without Azure support"));
1024 #endif
1025 } else if (parent.is_gcs()) {
1026 #ifdef HAVE_GCS
1027 RETURN_NOT_OK(gcs_.ls(parent, &paths));
1028 #else
1029 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1030 #endif
1031 } else if (parent.is_memfs()) {
1032 RETURN_NOT_OK(memfs_.ls(parent.to_path(), &paths));
1033 // URI class expects paths to be prepended
1034 for (std::string& path : paths) {
1035 path.insert(0, "mem://");
1036 }
1037 } else {
1038 return LOG_STATUS(
1039 Status::VFSError("Unsupported URI scheme: " + parent.to_string()));
1040 }
1041 parallel_sort(compute_tp_, paths.begin(), paths.end());
1042 for (auto& path : paths) {
1043 uris->emplace_back(path);
1044 }
1045 return Status::Ok();
1046 }
1047
move_file(const URI & old_uri,const URI & new_uri)1048 Status VFS::move_file(const URI& old_uri, const URI& new_uri) {
1049 if (!init_)
1050 return LOG_STATUS(
1051 Status::VFSError("Cannot move file; VFS not initialized"));
1052
1053 // If new_uri exists, delete it or raise an error based on `force`
1054 bool is_file;
1055 RETURN_NOT_OK(this->is_file(new_uri, &is_file));
1056 if (is_file)
1057 RETURN_NOT_OK(remove_file(new_uri));
1058
1059 // File
1060 if (old_uri.is_file()) {
1061 if (new_uri.is_file()) {
1062 #ifdef _WIN32
1063 return win_.move_path(old_uri.to_path(), new_uri.to_path());
1064 #else
1065 return posix_.move_path(old_uri.to_path(), new_uri.to_path());
1066 #endif
1067 }
1068 return LOG_STATUS(Status::VFSError(
1069 "Moving files across filesystems is not supported yet"));
1070 }
1071
1072 // HDFS
1073 if (old_uri.is_hdfs()) {
1074 if (new_uri.is_hdfs())
1075 #ifdef HAVE_HDFS
1076 return hdfs_->move_path(old_uri, new_uri);
1077 #else
1078 return LOG_STATUS(
1079 Status::VFSError("TileDB was built without HDFS support"));
1080 #endif
1081 return LOG_STATUS(Status::VFSError(
1082 "Moving files across filesystems is not supported yet"));
1083 }
1084
1085 // S3
1086 if (old_uri.is_s3()) {
1087 if (new_uri.is_s3())
1088 #ifdef HAVE_S3
1089 return s3_.move_object(old_uri, new_uri);
1090 #else
1091 return LOG_STATUS(
1092 Status::VFSError("TileDB was built without S3 support"));
1093 #endif
1094 return LOG_STATUS(Status::VFSError(
1095 "Moving files across filesystems is not supported yet"));
1096 }
1097
1098 // Azure
1099 if (old_uri.is_azure()) {
1100 if (new_uri.is_azure())
1101 #ifdef HAVE_AZURE
1102 return azure_.move_object(old_uri, new_uri);
1103 #else
1104 return LOG_STATUS(
1105 Status::VFSError("TileDB was built without Azure support"));
1106 #endif
1107 return LOG_STATUS(Status::VFSError(
1108 "Moving files across filesystems is not supported yet"));
1109 }
1110
1111 // GCS
1112 if (old_uri.is_gcs()) {
1113 if (new_uri.is_gcs())
1114 #ifdef HAVE_GCS
1115 return gcs_.move_object(old_uri, new_uri);
1116 #else
1117 return LOG_STATUS(
1118 Status::VFSError("TileDB was built without GCS support"));
1119 #endif
1120 return LOG_STATUS(Status::VFSError(
1121 "Moving files across filesystems is not supported yet"));
1122 }
1123
1124 // In-memory filesystem
1125 if (old_uri.is_memfs()) {
1126 if (new_uri.is_memfs()) {
1127 return memfs_.move(old_uri.to_path(), new_uri.to_path());
1128 }
1129 return LOG_STATUS(Status::VFSError(
1130 "Moving files across filesystems is not supported yet"));
1131 }
1132
1133 // Unsupported filesystem
1134 return LOG_STATUS(Status::VFSError(
1135 "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1136 new_uri.to_string()));
1137 }
1138
move_dir(const URI & old_uri,const URI & new_uri)1139 Status VFS::move_dir(const URI& old_uri, const URI& new_uri) {
1140 if (!init_)
1141 return LOG_STATUS(
1142 Status::VFSError("Cannot move directory; VFS not initialized"));
1143
1144 // File
1145 if (old_uri.is_file()) {
1146 if (new_uri.is_file()) {
1147 #ifdef _WIN32
1148 return win_.move_path(old_uri.to_path(), new_uri.to_path());
1149 #else
1150 return posix_.move_path(old_uri.to_path(), new_uri.to_path());
1151 #endif
1152 }
1153 return LOG_STATUS(Status::VFSError(
1154 "Moving files across filesystems is not supported yet"));
1155 }
1156
1157 // HDFS
1158 if (old_uri.is_hdfs()) {
1159 if (new_uri.is_hdfs())
1160 #ifdef HAVE_HDFS
1161 return hdfs_->move_path(old_uri, new_uri);
1162 #else
1163 return LOG_STATUS(
1164 Status::VFSError("TileDB was built without HDFS support"));
1165 #endif
1166 return LOG_STATUS(Status::VFSError(
1167 "Moving files across filesystems is not supported yet"));
1168 }
1169
1170 // S3
1171 if (old_uri.is_s3()) {
1172 if (new_uri.is_s3())
1173 #ifdef HAVE_S3
1174 return s3_.move_dir(old_uri, new_uri);
1175 #else
1176 return LOG_STATUS(
1177 Status::VFSError("TileDB was built without S3 support"));
1178 #endif
1179 return LOG_STATUS(Status::VFSError(
1180 "Moving files across filesystems is not supported yet"));
1181 }
1182
1183 // Azure
1184 if (old_uri.is_azure()) {
1185 if (new_uri.is_azure())
1186 #ifdef HAVE_AZURE
1187 return azure_.move_dir(old_uri, new_uri);
1188 #else
1189 return LOG_STATUS(
1190 Status::VFSError("TileDB was built without Azure support"));
1191 #endif
1192 return LOG_STATUS(Status::VFSError(
1193 "Moving files across filesystems is not supported yet"));
1194 }
1195
1196 // GCS
1197 if (old_uri.is_gcs()) {
1198 if (new_uri.is_gcs())
1199 #ifdef HAVE_GCS
1200 return gcs_.move_dir(old_uri, new_uri);
1201 #else
1202 return LOG_STATUS(
1203 Status::VFSError("TileDB was built without GCS support"));
1204 #endif
1205 return LOG_STATUS(Status::VFSError(
1206 "Moving files across filesystems is not supported yet"));
1207 }
1208
1209 // In-memory filesystem
1210 if (old_uri.is_memfs()) {
1211 if (new_uri.is_memfs()) {
1212 return memfs_.move(old_uri.to_path(), new_uri.to_path());
1213 }
1214 return LOG_STATUS(Status::VFSError(
1215 "Moving files across filesystems is not supported yet"));
1216 }
1217
1218 // Unsupported filesystem
1219 return LOG_STATUS(Status::VFSError(
1220 "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1221 new_uri.to_string()));
1222 }
1223
copy_file(const URI & old_uri,const URI & new_uri)1224 Status VFS::copy_file(const URI& old_uri, const URI& new_uri) {
1225 if (!init_)
1226 return LOG_STATUS(
1227 Status::VFSError("Cannot copy file; VFS not initialized"));
1228
1229 // If new_uri exists, delete it or raise an error based on `force`
1230 bool is_file;
1231 RETURN_NOT_OK(this->is_file(new_uri, &is_file));
1232 if (is_file)
1233 RETURN_NOT_OK(remove_file(new_uri));
1234
1235 // File
1236 if (old_uri.is_file()) {
1237 if (new_uri.is_file()) {
1238 #ifdef _WIN32
1239 return LOG_STATUS(Status::IOError(
1240 std::string("Copying files on Windows is not yet supported.")));
1241 #else
1242 return posix_.copy_file(old_uri.to_path(), new_uri.to_path());
1243 #endif
1244 }
1245 return LOG_STATUS(Status::VFSError(
1246 "Copying files across filesystems is not supported yet"));
1247 }
1248
1249 // HDFS
1250 if (old_uri.is_hdfs()) {
1251 if (new_uri.is_hdfs())
1252 #ifdef HAVE_HDFS
1253 return LOG_STATUS(Status::IOError(
1254 std::string("Copying files on HDFS is not yet supported.")));
1255 #else
1256 return LOG_STATUS(
1257 Status::VFSError("TileDB was built without HDFS support"));
1258 #endif
1259 return LOG_STATUS(Status::VFSError(
1260 "Copying files across filesystems is not supported yet"));
1261 }
1262
1263 // S3
1264 if (old_uri.is_s3()) {
1265 if (new_uri.is_s3())
1266 #ifdef HAVE_S3
1267 return s3_.copy_file(old_uri, new_uri);
1268 #else
1269 return LOG_STATUS(
1270 Status::VFSError("TileDB was built without S3 support"));
1271 #endif
1272 return LOG_STATUS(Status::VFSError(
1273 "Copying files across filesystems is not supported yet"));
1274 }
1275
1276 // Azure
1277 if (old_uri.is_azure()) {
1278 if (new_uri.is_azure())
1279 #ifdef HAVE_AZURE
1280 return LOG_STATUS(Status::IOError(
1281 std::string("Copying files on Azure is not yet supported.")));
1282 #else
1283 return LOG_STATUS(
1284 Status::VFSError("TileDB was built without Azure support"));
1285 #endif
1286 return LOG_STATUS(Status::VFSError(
1287 "Copying files across filesystems is not supported yet"));
1288 }
1289
1290 // GCS
1291 if (old_uri.is_gcs()) {
1292 if (new_uri.is_gcs())
1293 #ifdef HAVE_GCS
1294 return LOG_STATUS(Status::IOError(
1295 std::string("Copying files on GCS is not yet supported.")));
1296 #else
1297 return LOG_STATUS(
1298 Status::VFSError("TileDB was built without GCS support"));
1299 #endif
1300 return LOG_STATUS(Status::VFSError(
1301 "Copying files across filesystems is not supported yet"));
1302 }
1303
1304 // Unsupported filesystem
1305 return LOG_STATUS(Status::VFSError(
1306 "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1307 new_uri.to_string()));
1308 }
1309
copy_dir(const URI & old_uri,const URI & new_uri)1310 Status VFS::copy_dir(const URI& old_uri, const URI& new_uri) {
1311 if (!init_)
1312 return LOG_STATUS(
1313 Status::VFSError("Cannot copy directory; VFS not initialized"));
1314
1315 // File
1316 if (old_uri.is_file()) {
1317 if (new_uri.is_file()) {
1318 #ifdef _WIN32
1319 return LOG_STATUS(Status::IOError(
1320 std::string("Copying directories on Windows is not yet supported.")));
1321 #else
1322 return posix_.copy_dir(old_uri.to_path(), new_uri.to_path());
1323 #endif
1324 }
1325 return LOG_STATUS(Status::VFSError(
1326 "Copying directories across filesystems is not supported yet"));
1327 }
1328
1329 // HDFS
1330 if (old_uri.is_hdfs()) {
1331 if (new_uri.is_hdfs())
1332 #ifdef HAVE_HDFS
1333 return LOG_STATUS(Status::IOError(
1334 std::string("Copying directories on HDFS is not yet supported.")));
1335 #else
1336 return LOG_STATUS(
1337 Status::VFSError("TileDB was built without HDFS support"));
1338 #endif
1339 return LOG_STATUS(Status::VFSError(
1340 "Copying directories across filesystems is not supported yet"));
1341 }
1342
1343 // S3
1344 if (old_uri.is_s3()) {
1345 if (new_uri.is_s3())
1346 #ifdef HAVE_S3
1347 return s3_.copy_dir(old_uri, new_uri);
1348 #else
1349 return LOG_STATUS(
1350 Status::VFSError("TileDB was built without S3 support"));
1351 #endif
1352 return LOG_STATUS(Status::VFSError(
1353 "Copying directories across filesystems is not supported yet"));
1354 }
1355
1356 // Azure
1357 if (old_uri.is_azure()) {
1358 if (new_uri.is_azure())
1359 #ifdef HAVE_AZURE
1360 return LOG_STATUS(Status::IOError(
1361 std::string("Copying directories on Azure is not yet supported.")));
1362 #else
1363 return LOG_STATUS(
1364 Status::VFSError("TileDB was built without Azure support"));
1365 #endif
1366 return LOG_STATUS(Status::VFSError(
1367 "Copying directories across filesystems is not supported yet"));
1368 }
1369
1370 // GCS
1371 if (old_uri.is_gcs()) {
1372 if (new_uri.is_gcs())
1373 #ifdef HAVE_GCS
1374 return LOG_STATUS(Status::IOError(
1375 std::string("Copying directories on GCS is not yet supported.")));
1376 #else
1377 return LOG_STATUS(
1378 Status::VFSError("TileDB was built without GCS support"));
1379 #endif
1380 return LOG_STATUS(Status::VFSError(
1381 "Copying directories across filesystems is not supported yet"));
1382 }
1383
1384 // Unsupported filesystem
1385 return LOG_STATUS(Status::VFSError(
1386 "Unsupported URI schemes: " + old_uri.to_string() + ", " +
1387 new_uri.to_string()));
1388 }
1389
read(const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,bool use_read_ahead)1390 Status VFS::read(
1391 const URI& uri,
1392 const uint64_t offset,
1393 void* const buffer,
1394 const uint64_t nbytes,
1395 bool use_read_ahead) {
1396 stats_->add_counter("read_byte_num", nbytes);
1397
1398 if (!init_)
1399 return LOG_STATUS(Status::VFSError("Cannot read; VFS not initialized"));
1400
1401 // Get config params
1402 bool found;
1403 uint64_t min_parallel_size = 0;
1404 RETURN_NOT_OK(config_.get<uint64_t>(
1405 "vfs.min_parallel_size", &min_parallel_size, &found));
1406 assert(found);
1407 uint64_t max_ops = 0;
1408 RETURN_NOT_OK(max_parallel_ops(uri, &max_ops));
1409
1410 // Ensure that each thread is responsible for at least min_parallel_size
1411 // bytes, and cap the number of parallel operations at the configured maximum
1412 // number.
1413 uint64_t num_ops =
1414 std::min(std::max(nbytes / min_parallel_size, uint64_t(1)), max_ops);
1415
1416 if (num_ops == 1) {
1417 return read_impl(uri, offset, buffer, nbytes, use_read_ahead);
1418 } else {
1419 // we don't want read-ahead when performing random access reads
1420 use_read_ahead = false;
1421 std::vector<ThreadPool::Task> results;
1422 uint64_t thread_read_nbytes = utils::math::ceil(nbytes, num_ops);
1423
1424 for (uint64_t i = 0; i < num_ops; i++) {
1425 uint64_t begin = i * thread_read_nbytes,
1426 end = std::min((i + 1) * thread_read_nbytes - 1, nbytes - 1);
1427 uint64_t thread_nbytes = end - begin + 1;
1428 uint64_t thread_offset = offset + begin;
1429 auto thread_buffer = reinterpret_cast<char*>(buffer) + begin;
1430 auto task = cancelable_tasks_.execute(
1431 io_tp_,
1432 [this,
1433 uri,
1434 thread_offset,
1435 thread_buffer,
1436 thread_nbytes,
1437 use_read_ahead]() {
1438 return read_impl(
1439 uri,
1440 thread_offset,
1441 thread_buffer,
1442 thread_nbytes,
1443 use_read_ahead);
1444 });
1445 results.push_back(std::move(task));
1446 }
1447 Status st = io_tp_->wait_all(results);
1448 if (!st.ok()) {
1449 std::stringstream errmsg;
1450 errmsg << "VFS parallel read error '" << uri.to_string() << "'; "
1451 << st.message();
1452 return LOG_STATUS(Status::VFSError(errmsg.str()));
1453 }
1454 return st;
1455 }
1456 }
1457
read_impl(const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,const bool use_read_ahead)1458 Status VFS::read_impl(
1459 const URI& uri,
1460 const uint64_t offset,
1461 void* const buffer,
1462 const uint64_t nbytes,
1463 const bool use_read_ahead) {
1464 stats_->add_counter("read_ops_num", 1);
1465
1466 // We only check to use the read-ahead cache for cloud-storage
1467 // backends. No-op the `use_read_ahead` to prevent the unused
1468 // variable compiler warning.
1469 (void)use_read_ahead;
1470
1471 if (uri.is_file()) {
1472 #ifdef _WIN32
1473 return win_.read(uri.to_path(), offset, buffer, nbytes);
1474 #else
1475 return posix_.read(uri.to_path(), offset, buffer, nbytes);
1476 #endif
1477 }
1478 if (uri.is_hdfs()) {
1479 #ifdef HAVE_HDFS
1480 return hdfs_->read(uri, offset, buffer, nbytes);
1481 #else
1482 return LOG_STATUS(
1483 Status::VFSError("TileDB was built without HDFS support"));
1484 #endif
1485 }
1486 if (uri.is_s3()) {
1487 #ifdef HAVE_S3
1488 const auto read_fn = std::bind(
1489 &S3::read,
1490 &s3_,
1491 std::placeholders::_1,
1492 std::placeholders::_2,
1493 std::placeholders::_3,
1494 std::placeholders::_4,
1495 std::placeholders::_5,
1496 std::placeholders::_6);
1497 return read_ahead_impl(
1498 read_fn, uri, offset, buffer, nbytes, use_read_ahead);
1499 #else
1500 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1501 #endif
1502 }
1503 if (uri.is_azure()) {
1504 #ifdef HAVE_AZURE
1505 const auto read_fn = std::bind(
1506 &Azure::read,
1507 &azure_,
1508 std::placeholders::_1,
1509 std::placeholders::_2,
1510 std::placeholders::_3,
1511 std::placeholders::_4,
1512 std::placeholders::_5,
1513 std::placeholders::_6);
1514 return read_ahead_impl(
1515 read_fn, uri, offset, buffer, nbytes, use_read_ahead);
1516 #else
1517 return LOG_STATUS(
1518 Status::VFSError("TileDB was built without Azure support"));
1519 #endif
1520 }
1521 if (uri.is_gcs()) {
1522 #ifdef HAVE_GCS
1523 const auto read_fn = std::bind(
1524 &GCS::read,
1525 &gcs_,
1526 std::placeholders::_1,
1527 std::placeholders::_2,
1528 std::placeholders::_3,
1529 std::placeholders::_4,
1530 std::placeholders::_5,
1531 std::placeholders::_6);
1532 return read_ahead_impl(
1533 read_fn, uri, offset, buffer, nbytes, use_read_ahead);
1534 #else
1535 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1536 #endif
1537 }
1538 if (uri.is_memfs()) {
1539 return memfs_.read(uri.to_path(), offset, buffer, nbytes);
1540 }
1541
1542 return LOG_STATUS(
1543 Status::VFSError("Unsupported URI schemes: " + uri.to_string()));
1544 }
1545
read_ahead_impl(const std::function<Status (const URI &,off_t,void *,uint64_t,uint64_t,uint64_t *)> & read_fn,const URI & uri,const uint64_t offset,void * const buffer,const uint64_t nbytes,const bool use_read_ahead)1546 Status VFS::read_ahead_impl(
1547 const std::function<Status(
1548 const URI&, off_t, void*, uint64_t, uint64_t, uint64_t*)>& read_fn,
1549 const URI& uri,
1550 const uint64_t offset,
1551 void* const buffer,
1552 const uint64_t nbytes,
1553 const bool use_read_ahead) {
1554 // Stores the total number of bytes read.
1555 uint64_t nbytes_read = 0;
1556
1557 // Do not use the read-ahead cache if disabled by the caller.
1558 if (!use_read_ahead)
1559 return read_fn(uri, offset, buffer, nbytes, 0, &nbytes_read);
1560
1561 // Only perform a read-ahead if the requested read size
1562 // is smaller than the size of the buffers in the read-ahead
1563 // cache. This is because:
1564 // 1. The read-ahead is primarily beneficial for IO patterns
1565 // that consist of numerous small reads.
1566 // 2. Large reads may evict cached buffers that would be useful
1567 // to a future small read.
1568 // 3. It saves us a copy. We must make a copy of the buffer at
1569 // some point (one for the user, one for the cache).
1570 if (nbytes >= read_ahead_size_)
1571 return read_fn(uri, offset, buffer, nbytes, 0, &nbytes_read);
1572
1573 // Avoid a read if the requested buffer can be read from the
1574 // read cache. Note that we intentionally do not use a read
1575 // cache for local files because we rely on the operating
1576 // system's file system to cache readahead data in memory.
1577 // Additionally, we do not perform readahead with HDFS.
1578 bool success;
1579 RETURN_NOT_OK(read_ahead_cache_->read(uri, offset, buffer, nbytes, &success));
1580 if (success)
1581 return Status::Ok();
1582
1583 // We will read directly into the read-ahead buffer and then copy
1584 // the subrange of this buffer back to the user to satisfy the
1585 // read request.
1586 Buffer ra_buffer;
1587 RETURN_NOT_OK(ra_buffer.realloc(read_ahead_size_));
1588
1589 // Calculate the exact number of bytes to populate `ra_buffer`
1590 // with `read_ahead_size_` bytes.
1591 const uint64_t ra_nbytes = read_ahead_size_ - nbytes;
1592
1593 // Read into `ra_buffer`.
1594 RETURN_NOT_OK(
1595 read_fn(uri, offset, ra_buffer.data(), nbytes, ra_nbytes, &nbytes_read));
1596
1597 // Copy the requested read range back into the caller's output `buffer`.
1598 assert(nbytes_read >= nbytes);
1599 std::memcpy(buffer, ra_buffer.data(), nbytes);
1600
1601 // Cache `ra_buffer` at `offset`.
1602 ra_buffer.set_size(nbytes_read);
1603 RETURN_NOT_OK(read_ahead_cache_->insert(uri, offset, std::move(ra_buffer)));
1604
1605 return Status::Ok();
1606 }
1607
read_all(const URI & uri,const std::vector<std::tuple<uint64_t,void *,uint64_t>> & regions,ThreadPool * thread_pool,std::vector<ThreadPool::Task> * tasks,const bool use_read_ahead)1608 Status VFS::read_all(
1609 const URI& uri,
1610 const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions,
1611 ThreadPool* thread_pool,
1612 std::vector<ThreadPool::Task>* tasks,
1613 const bool use_read_ahead) {
1614 if (!init_)
1615 return LOG_STATUS(Status::VFSError("Cannot read all; VFS not initialized"));
1616
1617 if (regions.empty())
1618 return Status::Ok();
1619
1620 // Convert the individual regions into batched regions.
1621 std::vector<BatchedRead> batches;
1622 RETURN_NOT_OK(compute_read_batches(regions, &batches));
1623
1624 // Read all the batches and copy to the original destinations.
1625 for (const auto& batch : batches) {
1626 URI uri_copy = uri;
1627 BatchedRead batch_copy = batch;
1628 auto task =
1629 thread_pool->execute([this, uri_copy, batch_copy, use_read_ahead]() {
1630 Buffer buffer;
1631 RETURN_NOT_OK(buffer.realloc(batch_copy.nbytes));
1632 RETURN_NOT_OK(read(
1633 uri_copy,
1634 batch_copy.offset,
1635 buffer.data(),
1636 batch_copy.nbytes,
1637 use_read_ahead));
1638 // Parallel copy back into the individual destinations.
1639 for (uint64_t i = 0; i < batch_copy.regions.size(); i++) {
1640 const auto& region = batch_copy.regions[i];
1641 uint64_t offset = std::get<0>(region);
1642 void* dest = std::get<1>(region);
1643 uint64_t nbytes = std::get<2>(region);
1644 std::memcpy(dest, buffer.data(offset - batch_copy.offset), nbytes);
1645 }
1646
1647 return Status::Ok();
1648 });
1649
1650 tasks->push_back(std::move(task));
1651 }
1652
1653 return Status::Ok();
1654 }
1655
compute_read_batches(const std::vector<std::tuple<uint64_t,void *,uint64_t>> & regions,std::vector<BatchedRead> * batches) const1656 Status VFS::compute_read_batches(
1657 const std::vector<std::tuple<uint64_t, void*, uint64_t>>& regions,
1658 std::vector<BatchedRead>* batches) const {
1659 // Get config params
1660 bool found;
1661 uint64_t min_batch_size = 0;
1662 RETURN_NOT_OK(
1663 config_.get<uint64_t>("vfs.min_batch_size", &min_batch_size, &found));
1664 assert(found);
1665 uint64_t min_batch_gap = 0;
1666 RETURN_NOT_OK(
1667 config_.get<uint64_t>("vfs.min_batch_gap", &min_batch_gap, &found));
1668 assert(found);
1669
1670 // Ensure the regions are sorted on offset.
1671 std::vector<std::tuple<uint64_t, void*, uint64_t>> sorted_regions(
1672 regions.begin(), regions.end());
1673 parallel_sort(
1674 compute_tp_,
1675 sorted_regions.begin(),
1676 sorted_regions.end(),
1677 [](const std::tuple<uint64_t, void*, uint64_t>& a,
1678 const std::tuple<uint64_t, void*, uint64_t>& b) {
1679 return std::get<0>(a) < std::get<0>(b);
1680 });
1681
1682 // Start the first batch containing only the first region.
1683 BatchedRead curr_batch(sorted_regions.front());
1684 uint64_t curr_batch_useful_bytes = curr_batch.nbytes;
1685 for (uint64_t i = 1; i < sorted_regions.size(); i++) {
1686 const auto& region = sorted_regions[i];
1687 uint64_t offset = std::get<0>(region);
1688 uint64_t nbytes = std::get<2>(region);
1689 uint64_t new_batch_size = (offset + nbytes) - curr_batch.offset;
1690 uint64_t gap = offset - (curr_batch.offset + curr_batch.nbytes);
1691 if (new_batch_size <= min_batch_size || gap <= min_batch_gap) {
1692 // Extend current batch.
1693 curr_batch.nbytes = new_batch_size;
1694 curr_batch.regions.push_back(region);
1695 curr_batch_useful_bytes += nbytes;
1696 } else {
1697 // Push the old batch and start a new one.
1698 batches->push_back(curr_batch);
1699 curr_batch.offset = offset;
1700 curr_batch.nbytes = nbytes;
1701 curr_batch.regions.clear();
1702 curr_batch.regions.push_back(region);
1703 curr_batch_useful_bytes = nbytes;
1704 }
1705 }
1706
1707 // Push the last batch
1708 batches->push_back(curr_batch);
1709
1710 return Status::Ok();
1711 }
1712
supports_fs(Filesystem fs) const1713 bool VFS::supports_fs(Filesystem fs) const {
1714 return (supported_fs_.find(fs) != supported_fs_.end());
1715 }
1716
supports_uri_scheme(const URI & uri) const1717 bool VFS::supports_uri_scheme(const URI& uri) const {
1718 if (uri.is_s3()) {
1719 return supports_fs(Filesystem::S3);
1720 } else if (uri.is_azure()) {
1721 return supports_fs(Filesystem::AZURE);
1722 } else if (uri.is_gcs()) {
1723 return supports_fs(Filesystem::GCS);
1724 } else if (uri.is_hdfs()) {
1725 return supports_fs(Filesystem::HDFS);
1726 } else {
1727 return true;
1728 }
1729 }
1730
sync(const URI & uri)1731 Status VFS::sync(const URI& uri) {
1732 if (!init_)
1733 return LOG_STATUS(Status::VFSError("Cannot sync; VFS not initialized"));
1734
1735 if (uri.is_file()) {
1736 #ifdef _WIN32
1737 return win_.sync(uri.to_path());
1738 #else
1739 return posix_.sync(uri.to_path());
1740 #endif
1741 }
1742 if (uri.is_hdfs()) {
1743 #ifdef HAVE_HDFS
1744 return hdfs_->sync(uri);
1745 #else
1746 return LOG_STATUS(
1747 Status::VFSError("TileDB was built without HDFS support"));
1748 #endif
1749 }
1750 if (uri.is_s3()) {
1751 #ifdef HAVE_S3
1752 return Status::Ok();
1753 #else
1754 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1755 #endif
1756 }
1757 if (uri.is_azure()) {
1758 #ifdef HAVE_AZURE
1759 return Status::Ok();
1760 #else
1761 return LOG_STATUS(
1762 Status::VFSError("TileDB was built without Azure support"));
1763 #endif
1764 }
1765 if (uri.is_gcs()) {
1766 #ifdef HAVE_GCS
1767 return Status::Ok();
1768 #else
1769 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1770 #endif
1771 }
1772 if (uri.is_memfs()) {
1773 return Status::Ok();
1774 }
1775 return LOG_STATUS(
1776 Status::VFSError("Unsupported URI scheme: " + uri.to_string()));
1777 }
1778
open_file(const URI & uri,VFSMode mode)1779 Status VFS::open_file(const URI& uri, VFSMode mode) {
1780 if (!init_)
1781 return LOG_STATUS(
1782 Status::VFSError("Cannot open file; VFS not initialized"));
1783
1784 bool is_file;
1785 RETURN_NOT_OK(this->is_file(uri, &is_file));
1786
1787 switch (mode) {
1788 case VFSMode::VFS_READ:
1789 if (!is_file)
1790 return LOG_STATUS(Status::VFSError(
1791 std::string("Cannot open file '") + uri.c_str() +
1792 "'; File does not exist"));
1793 break;
1794 case VFSMode::VFS_WRITE:
1795 if (is_file)
1796 RETURN_NOT_OK(remove_file(uri));
1797 break;
1798 case VFSMode::VFS_APPEND:
1799 if (uri.is_s3()) {
1800 #ifdef HAVE_S3
1801 return LOG_STATUS(Status::VFSError(
1802 std::string("Cannot open file '") + uri.c_str() +
1803 "'; S3 does not support append mode"));
1804 #else
1805 return LOG_STATUS(Status::VFSError(
1806 "Cannot open file; TileDB was built without S3 support"));
1807 #endif
1808 }
1809 if (uri.is_azure()) {
1810 #ifdef HAVE_AZURE
1811 return LOG_STATUS(Status::VFSError(
1812 std::string("Cannot open file '") + uri.c_str() +
1813 "'; Azure does not support append mode"));
1814 #else
1815 return LOG_STATUS(Status::VFSError(
1816 "Cannot open file; TileDB was built without Azure support"));
1817 #endif
1818 }
1819 if (uri.is_gcs()) {
1820 #ifdef HAVE_GCS
1821 return LOG_STATUS(Status::VFSError(
1822 std::string("Cannot open file '") + uri.c_str() +
1823 "'; GCS does not support append mode"));
1824 #else
1825 return LOG_STATUS(Status::VFSError(
1826 "Cannot open file; TileDB was built without GCS support"));
1827 #endif
1828 }
1829 break;
1830 }
1831
1832 return Status::Ok();
1833 }
1834
close_file(const URI & uri)1835 Status VFS::close_file(const URI& uri) {
1836 if (!init_)
1837 return LOG_STATUS(
1838 Status::VFSError("Cannot close file; VFS not initialized"));
1839
1840 if (uri.is_file()) {
1841 #ifdef _WIN32
1842 return win_.sync(uri.to_path());
1843 #else
1844 return posix_.sync(uri.to_path());
1845 #endif
1846 }
1847 if (uri.is_hdfs()) {
1848 #ifdef HAVE_HDFS
1849 return hdfs_->sync(uri);
1850 #else
1851 return LOG_STATUS(
1852 Status::VFSError("TileDB was built without HDFS support"));
1853 #endif
1854 }
1855 if (uri.is_s3()) {
1856 #ifdef HAVE_S3
1857 return s3_.flush_object(uri);
1858 #else
1859 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1860 #endif
1861 }
1862 if (uri.is_azure()) {
1863 #ifdef HAVE_AZURE
1864 return azure_.flush_blob(uri);
1865 #else
1866 return LOG_STATUS(
1867 Status::VFSError("TileDB was built without Azure support"));
1868 #endif
1869 }
1870 if (uri.is_gcs()) {
1871 #ifdef HAVE_GCS
1872 return gcs_.flush_object(uri);
1873 #else
1874 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1875 #endif
1876 }
1877 if (uri.is_memfs()) {
1878 return Status::Ok();
1879 }
1880 return LOG_STATUS(
1881 Status::VFSError("Unsupported URI schemes: " + uri.to_string()));
1882 }
1883
write(const URI & uri,const void * buffer,uint64_t buffer_size)1884 Status VFS::write(const URI& uri, const void* buffer, uint64_t buffer_size) {
1885 stats_->add_counter("write_byte_num", buffer_size);
1886 stats_->add_counter("write_ops_num", 1);
1887
1888 if (!init_)
1889 return LOG_STATUS(Status::VFSError("Cannot write; VFS not initialized"));
1890
1891 if (uri.is_file()) {
1892 #ifdef _WIN32
1893 return win_.write(uri.to_path(), buffer, buffer_size);
1894 #else
1895 return posix_.write(uri.to_path(), buffer, buffer_size);
1896 #endif
1897 }
1898 if (uri.is_hdfs()) {
1899 #ifdef HAVE_HDFS
1900 return hdfs_->write(uri, buffer, buffer_size);
1901 #else
1902 return LOG_STATUS(
1903 Status::VFSError("TileDB was built without HDFS support"));
1904 #endif
1905 }
1906 if (uri.is_s3()) {
1907 #ifdef HAVE_S3
1908 return s3_.write(uri, buffer, buffer_size);
1909 #else
1910 return LOG_STATUS(Status::VFSError("TileDB was built without S3 support"));
1911 #endif
1912 }
1913 if (uri.is_azure()) {
1914 #ifdef HAVE_AZURE
1915 return azure_.write(uri, buffer, buffer_size);
1916 #else
1917 return LOG_STATUS(
1918 Status::VFSError("TileDB was built without Azure support"));
1919 #endif
1920 }
1921 if (uri.is_gcs()) {
1922 #ifdef HAVE_GCS
1923 return gcs_.write(uri, buffer, buffer_size);
1924 #else
1925 return LOG_STATUS(Status::VFSError("TileDB was built without GCS support"));
1926 #endif
1927 }
1928 if (uri.is_memfs()) {
1929 return memfs_.write(uri.to_path(), buffer, buffer_size);
1930 }
1931 return LOG_STATUS(
1932 Status::VFSError("Unsupported URI schemes: " + uri.to_string()));
1933 }
1934
1935 } // namespace sm
1936 } // namespace tiledb
1937