1 /**
2 * @file storage_manager.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 * @copyright Copyright (c) 2016 MIT and Intel Corporation
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
28 *
29 * @section DESCRIPTION
30 *
31 * This file implements the StorageManager class.
32 */
33
34 #include <algorithm>
35 #include <functional>
36 #include <iostream>
37 #include <sstream>
38
39 #include "tiledb/common/heap_memory.h"
40 #include "tiledb/common/logger.h"
41 #include "tiledb/sm/array/array.h"
42 #include "tiledb/sm/array_schema/array_schema.h"
43 #include "tiledb/sm/array_schema/array_schema_evolution.h"
44 #include "tiledb/sm/cache/buffer_lru_cache.h"
45 #include "tiledb/sm/enums/array_type.h"
46 #include "tiledb/sm/enums/layout.h"
47 #include "tiledb/sm/enums/object_type.h"
48 #include "tiledb/sm/enums/query_type.h"
49 #include "tiledb/sm/filesystem/vfs.h"
50 #include "tiledb/sm/fragment/fragment_info.h"
51 #include "tiledb/sm/global_state/global_state.h"
52 #include "tiledb/sm/global_state/unit_test_config.h"
53 #include "tiledb/sm/misc/parallel_functions.h"
54 #include "tiledb/sm/misc/utils.h"
55 #include "tiledb/sm/misc/uuid.h"
56 #include "tiledb/sm/query/query.h"
57 #include "tiledb/sm/rest/rest_client.h"
58 #include "tiledb/sm/stats/global_stats.h"
59 #include "tiledb/sm/storage_manager/consolidator.h"
60 #include "tiledb/sm/storage_manager/open_array.h"
61 #include "tiledb/sm/storage_manager/storage_manager.h"
62 #include "tiledb/sm/tile/generic_tile_io.h"
63 #include "tiledb/sm/tile/tile.h"
64
65 #include <algorithm>
66 #include <iostream>
67 #include <sstream>
68
69 using namespace tiledb::common;
70
71 namespace tiledb {
72 namespace sm {
73
74 /* ****************************** */
75 /* CONSTRUCTORS & DESTRUCTORS */
76 /* ****************************** */
77
StorageManager(ThreadPool * const compute_tp,ThreadPool * const io_tp,stats::Stats * const parent_stats,tdb_shared_ptr<Logger> logger)78 StorageManager::StorageManager(
79 ThreadPool* const compute_tp,
80 ThreadPool* const io_tp,
81 stats::Stats* const parent_stats,
82 tdb_shared_ptr<Logger> logger)
83 : stats_(parent_stats->create_child("StorageManager"))
84 , logger_(logger)
85 , cancellation_in_progress_(false)
86 , queries_in_progress_(0)
87 , compute_tp_(compute_tp)
88 , io_tp_(io_tp)
89 , vfs_(nullptr) {
90 }
91
~StorageManager()92 StorageManager::~StorageManager() {
93 global_state::GlobalState::GetGlobalState().unregister_storage_manager(this);
94
95 if (vfs_ != nullptr)
96 cancel_all_tasks();
97
98 // Release all filelocks and delete all opened arrays for reads
99 for (auto& open_array_it : open_arrays_for_reads_) {
100 open_array_it.second->file_unlock(vfs_);
101 tdb_delete(open_array_it.second);
102 }
103
104 // Delete all opened arrays for writes
105 for (auto& open_array_it : open_arrays_for_writes_)
106 tdb_delete(open_array_it.second);
107
108 for (auto& fl_it : xfilelocks_) {
109 auto filelock = fl_it.second;
110 auto lock_uri = URI(fl_it.first).join_path(constants::filelock_name);
111 if (filelock != INVALID_FILELOCK)
112 vfs_->filelock_unlock(lock_uri);
113 }
114
115 if (vfs_ != nullptr) {
116 const Status st = vfs_->terminate();
117 if (!st.ok()) {
118 logger_->status(Status::StorageManagerError("Failed to terminate VFS."));
119 }
120
121 tdb_delete(vfs_);
122 }
123 }
124
125 /* ****************************** */
126 /* API */
127 /* ****************************** */
128
array_close_for_reads(const URI & array_uri)129 Status StorageManager::array_close_for_reads(const URI& array_uri) {
130 // Lock mutex
131 std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
132
133 // Find the open array entry
134 auto it = open_arrays_for_reads_.find(array_uri.to_string());
135
136 // Do nothing if array is closed
137 if (it == open_arrays_for_reads_.end()) {
138 return Status::Ok();
139 }
140
141 // For easy reference
142 OpenArray* open_array = it->second;
143
144 // Lock the mutex of the array and decrement counter
145 open_array->mtx_lock();
146 open_array->cnt_decr();
147
148 // Close the array if the counter reaches 0
149 if (open_array->cnt() == 0) {
150 // Release file lock
151 auto st = open_array->file_unlock(vfs_);
152 if (!st.ok()) {
153 open_array->mtx_unlock();
154 return st;
155 }
156 // Remove open array entry
157 open_array->mtx_unlock();
158 tdb_delete(open_array);
159 open_arrays_for_reads_.erase(it);
160 } else { // Just unlock the array mutex
161 open_array->mtx_unlock();
162 }
163
164 xlock_cv_.notify_all();
165
166 return Status::Ok();
167 }
168
array_close_for_writes(const URI & array_uri,const EncryptionKey & encryption_key,Metadata * array_metadata)169 Status StorageManager::array_close_for_writes(
170 const URI& array_uri,
171 const EncryptionKey& encryption_key,
172 Metadata* array_metadata) {
173 // Lock mutex
174 std::lock_guard<std::mutex> lock{open_array_for_writes_mtx_};
175
176 // Find the open array entry
177 auto it = open_arrays_for_writes_.find(array_uri.to_string());
178
179 // Do nothing if array is closed
180 if (it == open_arrays_for_writes_.end()) {
181 return Status::Ok();
182 }
183
184 // For easy reference
185 OpenArray* open_array = it->second;
186
187 // Flush the array metadata
188 RETURN_NOT_OK(
189 store_array_metadata(array_uri, encryption_key, array_metadata));
190
191 // Lock the mutex of the array and decrement counter
192 open_array->mtx_lock();
193 open_array->cnt_decr();
194
195 // Close the array if the counter reaches 0
196 if (open_array->cnt() == 0) {
197 open_array->mtx_unlock();
198 tdb_delete(open_array);
199 open_arrays_for_writes_.erase(it);
200 } else { // Just unlock the array mutex
201 open_array->mtx_unlock();
202 }
203
204 return Status::Ok();
205 }
206
207 std::tuple<
208 Status,
209 std::optional<ArraySchema*>,
210 std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>,
211 std::optional<std::vector<tdb_shared_ptr<FragmentMetadata>>>>
array_open_for_reads(const URI & array_uri,const EncryptionKey & enc_key,uint64_t timestamp_start,uint64_t timestamp_end)212 StorageManager::array_open_for_reads(
213 const URI& array_uri,
214 const EncryptionKey& enc_key,
215 uint64_t timestamp_start,
216 uint64_t timestamp_end) {
217 auto timer_se = stats_->start_timer("read_array_open");
218
219 /* NOTE: these variables may be modified on a different thread
220 in the load_array_fragments_task below.
221 */
222 std::vector<TimestampedURI> fragments_to_load;
223 std::vector<URI> fragment_uris;
224 URI meta_uri;
225 Buffer f_buff;
226 std::unordered_map<std::string, uint64_t> offsets;
227
228 // Fetch array fragments async
229 std::vector<ThreadPool::Task> load_array_fragments_task;
230 load_array_fragments_task.emplace_back(io_tp_->execute([array_uri,
231 &enc_key,
232 &f_buff,
233 &fragments_to_load,
234 &fragment_uris,
235 &meta_uri,
236 &offsets,
237 ×tamp_start,
238 ×tamp_end,
239 this]() {
240 // Determine which fragments to load
241 RETURN_NOT_OK(get_fragment_uris(array_uri, &fragment_uris, &meta_uri));
242 RETURN_NOT_OK(get_sorted_uris(
243 fragment_uris, &fragments_to_load, timestamp_start, timestamp_end));
244 // Get the consolidated fragment metadata
245 RETURN_NOT_OK(
246 load_consolidated_fragment_meta(meta_uri, enc_key, &f_buff, &offsets));
247 return Status::Ok();
248 }));
249
250 // Wait for array fragments to be loaded.
251 Status st = io_tp_->wait_all(load_array_fragments_task);
252
253 auto open_array = (OpenArray*)nullptr;
254 st = array_open_without_fragments(array_uri, enc_key, &open_array);
255
256 if (!st.ok()) {
257 io_tp_->wait_all(load_array_fragments_task);
258 return {st, std::nullopt, std::nullopt, std::nullopt};
259 }
260
261 if (!st.ok()) {
262 open_array->mtx_unlock();
263 array_close_for_reads(array_uri);
264 return {st, std::nullopt, std::nullopt, std::nullopt};
265 }
266
267 // Get fragment metadata in the case of reads, if not fetched already
268 std::vector<tdb_shared_ptr<FragmentMetadata>> fragment_metadata;
269 st = load_fragment_metadata(
270 open_array,
271 enc_key,
272 fragments_to_load,
273 &f_buff,
274 offsets,
275 &fragment_metadata);
276
277 if (!st.ok()) {
278 open_array->mtx_unlock();
279 array_close_for_reads(array_uri);
280 return {st, std::nullopt, std::nullopt, std::nullopt};
281 }
282
283 // Unlock the array mutex
284 open_array->mtx_unlock();
285
286 // Note that we retain the (shared) lock on the array filelock
287 return {Status::Ok(),
288 open_array->array_schema_latest(),
289 open_array->array_schemas_all(),
290 fragment_metadata};
291 }
292
293 std::tuple<
294 Status,
295 std::optional<ArraySchema*>,
296 std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>>
array_open_for_reads_without_fragments(const URI & array_uri,const EncryptionKey & enc_key)297 StorageManager::array_open_for_reads_without_fragments(
298 const URI& array_uri, const EncryptionKey& enc_key) {
299 auto timer_se = stats_->start_timer("read_array_open");
300
301 auto open_array = (OpenArray*)nullptr;
302 Status st = array_open_without_fragments(array_uri, enc_key, &open_array);
303 if (!st.ok()) {
304 return {st, std::nullopt, std::nullopt};
305 }
306
307 // Unlock the array mutex
308 open_array->mtx_unlock();
309
310 // Note that we retain the (shared) lock on the array filelock
311 return {Status::Ok(),
312 open_array->array_schema_latest(),
313 open_array->array_schemas_all()};
314 }
315
316 std::tuple<
317 Status,
318 std::optional<ArraySchema*>,
319 std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>>
array_open_for_writes(const URI & array_uri,const EncryptionKey & encryption_key)320 StorageManager::array_open_for_writes(
321 const URI& array_uri, const EncryptionKey& encryption_key) {
322 if (!vfs_->supports_uri_scheme(array_uri))
323 return {logger_->status(Status::StorageManagerError(
324 "Cannot open array; URI scheme unsupported.")),
325 std::nullopt,
326 std::nullopt};
327
328 // Check if array exists
329 ObjectType obj_type;
330 auto st = this->object_type(array_uri, &obj_type);
331 if (!st.ok())
332 return {st, std::nullopt, std::nullopt};
333
334 if (obj_type != ObjectType::ARRAY) {
335 return {logger_->status(Status::StorageManagerError(
336 "Cannot open array; Array does not exist")),
337 std::nullopt,
338 std::nullopt};
339 }
340
341 auto open_array = (OpenArray*)nullptr;
342
343 // Lock mutex
344 {
345 std::lock_guard<std::mutex> lock{open_array_for_writes_mtx_};
346
347 // Find the open array entry and check key correctness
348 auto it = open_arrays_for_writes_.find(array_uri.to_string());
349 if (it != open_arrays_for_writes_.end()) {
350 st = it->second->set_encryption_key(encryption_key);
351 if (!st.ok())
352 return {st, std::nullopt, std::nullopt};
353
354 open_array = it->second;
355 } else { // Create a new entry
356 open_array = tdb_new(OpenArray, array_uri, QueryType::WRITE);
357 st = open_array->set_encryption_key(encryption_key);
358 if (!st.ok()) {
359 tdb_delete(open_array) return {st, std::nullopt, std::nullopt};
360 }
361
362 open_arrays_for_writes_[array_uri.to_string()] = open_array;
363 }
364
365 // Lock the array and increment counter
366 open_array->mtx_lock();
367 open_array->cnt_incr();
368 }
369
370 // No shared filelock needed to be acquired
371
372 // When opening the array we want to always reload the schema
373 // Fragments are always relisted, now with schema evolution we
374 // need to make sure we list out any new schemas too.
375 // Fragment listing is significantly slower than listing schemas
376 // and they happen in parallel so this is low impact
377 st = load_array_schema(array_uri, open_array, encryption_key);
378 if (!st.ok()) {
379 open_array->mtx_unlock();
380 array_close_for_writes(array_uri, encryption_key, nullptr);
381 return {st, std::nullopt, std::nullopt};
382 }
383
384 // This library should not be able to write to newer-versioned arrays
385 // (but it is ok to write to older arrays)
386 if (open_array->array_schema_latest()->version() >
387 constants::format_version) {
388 std::stringstream err;
389 err << "Cannot open array for writes; Array format version (";
390 err << open_array->array_schema_latest()->version();
391 err << ") is newer than library format version (";
392 err << constants::format_version << ")";
393 open_array->mtx_unlock();
394 array_close_for_writes(array_uri, encryption_key, nullptr);
395 return {logger_->status(Status::StorageManagerError(err.str())),
396 std::nullopt,
397 std::nullopt};
398 }
399
400 // Unlock the array mutex
401 open_array->mtx_unlock();
402
403 return {Status::Ok(),
404 open_array->array_schema_latest(),
405 open_array->array_schemas_all()};
406 }
407
array_load_fragments(const URI & array_uri,const EncryptionKey & enc_key,std::vector<tdb_shared_ptr<FragmentMetadata>> * fragment_metadata,const std::vector<TimestampedURI> & fragments_to_load)408 Status StorageManager::array_load_fragments(
409 const URI& array_uri,
410 const EncryptionKey& enc_key,
411 std::vector<tdb_shared_ptr<FragmentMetadata>>* fragment_metadata,
412 const std::vector<TimestampedURI>& fragments_to_load) {
413 auto timer_se = stats_->start_timer("read_array_open");
414
415 auto open_array = (OpenArray*)nullptr;
416 // Lock mutex
417 {
418 std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
419
420 // Find the open array entry
421 auto it = open_arrays_for_reads_.find(array_uri.to_string());
422 if (it == open_arrays_for_reads_.end()) {
423 return logger_->status(Status::StorageManagerError(
424 std::string("Cannot reopen array ") + array_uri.to_string() +
425 "; Array not open"));
426 }
427 RETURN_NOT_OK(it->second->set_encryption_key(enc_key));
428 open_array = it->second;
429
430 // Lock the array
431 open_array->mtx_lock();
432 }
433
434 std::unordered_map<std::string, uint64_t> offsets;
435
436 // Get fragment metadata in the case of reads, if not fetched already
437 auto st = load_fragment_metadata(
438 open_array,
439 enc_key,
440 fragments_to_load,
441 nullptr,
442 offsets,
443 fragment_metadata);
444 if (!st.ok()) {
445 open_array->mtx_unlock();
446 array_close_for_reads(array_uri);
447 return st;
448 }
449
450 // Unlock the mutexes
451 open_array->mtx_unlock();
452
453 return st;
454 }
455
456 std::tuple<
457 Status,
458 std::optional<ArraySchema*>,
459 std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>,
460 std::optional<std::vector<tdb_shared_ptr<FragmentMetadata>>>>
array_reopen(const URI & array_uri,const EncryptionKey & enc_key,uint64_t timestamp_start,uint64_t timestamp_end)461 StorageManager::array_reopen(
462 const URI& array_uri,
463 const EncryptionKey& enc_key,
464 uint64_t timestamp_start,
465 uint64_t timestamp_end) {
466 auto timer_se = stats_->start_timer("read_array_open");
467
468 auto open_array = (OpenArray*)nullptr;
469
470 // Lock mutex
471 {
472 std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
473
474 // Find the open array entry
475 auto it = open_arrays_for_reads_.find(array_uri.to_string());
476 if (it == open_arrays_for_reads_.end()) {
477 return {logger_->status(Status::StorageManagerError(
478 std::string("Cannot reopen array ") + array_uri.to_string() +
479 "; Array not open")),
480 std::nullopt,
481 std::nullopt,
482 std::nullopt};
483 }
484 auto st = it->second->set_encryption_key(enc_key);
485 if (!st.ok())
486 return {st, std::nullopt, std::nullopt, std::nullopt};
487
488 open_array = it->second;
489
490 // Lock the array
491 open_array->mtx_lock();
492 }
493
494 // Determine which fragments to load
495 std::vector<TimestampedURI> fragments_to_load;
496 std::vector<URI> fragment_uris;
497 URI meta_uri;
498 auto st = get_fragment_uris(array_uri, &fragment_uris, &meta_uri);
499 if (!st.ok())
500 return {st, std::nullopt, std::nullopt, std::nullopt};
501 st = get_sorted_uris(
502 fragment_uris, &fragments_to_load, timestamp_start, timestamp_end);
503 if (!st.ok())
504 return {st, std::nullopt, std::nullopt, std::nullopt};
505
506 // Get the consolidated fragment metadata
507 Buffer f_buff;
508 std::unordered_map<std::string, uint64_t> offsets;
509 st = load_consolidated_fragment_meta(meta_uri, enc_key, &f_buff, &offsets);
510 if (!st.ok())
511 return {st, std::nullopt, std::nullopt, std::nullopt};
512
513 // Get fragment metadata in the case of reads, if not fetched already
514 std::vector<tdb_shared_ptr<FragmentMetadata>> fragment_metadata;
515 st = load_fragment_metadata(
516 open_array,
517 enc_key,
518 fragments_to_load,
519 &f_buff,
520 offsets,
521 &fragment_metadata);
522 if (!st.ok()) {
523 open_array->mtx_unlock();
524 array_close_for_reads(array_uri);
525 return {st, std::nullopt, std::nullopt, std::nullopt};
526 }
527
528 // Unlock the mutexes
529 open_array->mtx_unlock();
530
531 return {st,
532 open_array->array_schema_latest(),
533 open_array->array_schemas_all(),
534 fragment_metadata};
535 }
536
array_consolidate(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length,const Config * config)537 Status StorageManager::array_consolidate(
538 const char* array_name,
539 EncryptionType encryption_type,
540 const void* encryption_key,
541 uint32_t key_length,
542 const Config* config) {
543 // Check array URI
544 URI array_uri(array_name);
545 if (array_uri.is_invalid()) {
546 return logger_->status(
547 Status::StorageManagerError("Cannot consolidate array; Invalid URI"));
548 }
549
550 // Check if array exists
551 ObjectType obj_type;
552 RETURN_NOT_OK(object_type(array_uri, &obj_type));
553
554 if (obj_type != ObjectType::ARRAY) {
555 return logger_->status(Status::StorageManagerError(
556 "Cannot consolidate array; Array does not exist"));
557 }
558
559 // If 'config' is unset, use the 'config_' that was set during initialization
560 // of this StorageManager instance.
561 if (!config) {
562 config = &config_;
563 }
564
565 // Get encryption key from config
566 std::string encryption_key_from_cfg;
567 if (!encryption_key) {
568 bool found = false;
569 encryption_key_from_cfg = config->get("sm.encryption_key", &found);
570 assert(found);
571 }
572
573 if (!encryption_key_from_cfg.empty()) {
574 encryption_key = encryption_key_from_cfg.c_str();
575 std::string encryption_type_from_cfg;
576 bool found = false;
577 encryption_type_from_cfg = config->get("sm.encryption_type", &found);
578 assert(found);
579 auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
580 RETURN_NOT_OK(st);
581 encryption_type = et.value();
582
583 if (EncryptionKey::is_valid_key_length(
584 encryption_type,
585 static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
586 const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
587 if (unit_test_cfg.array_encryption_key_length.is_set()) {
588 key_length = unit_test_cfg.array_encryption_key_length.get();
589 } else {
590 key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
591 }
592 } else {
593 encryption_key = nullptr;
594 key_length = 0;
595 }
596 }
597
598 // Consolidate
599 Consolidator consolidator(this);
600 return consolidator.consolidate(
601 array_name, encryption_type, encryption_key, key_length, config);
602 }
603
array_vacuum(const char * array_name,const Config * config)604 Status StorageManager::array_vacuum(
605 const char* array_name, const Config* config) {
606 // If 'config' is unset, use the 'config_' that was set during initialization
607 // of this StorageManager instance.
608 if (!config) {
609 config = &config_;
610 }
611
612 // Get mode
613 const char* mode;
614 RETURN_NOT_OK(config->get("sm.vacuum.mode", &mode));
615
616 bool found = false;
617 uint64_t timestamp_start;
618 RETURN_NOT_OK(config->get<uint64_t>(
619 "sm.vacuum.timestamp_start", ×tamp_start, &found));
620 assert(found);
621
622 uint64_t timestamp_end;
623 RETURN_NOT_OK(
624 config->get<uint64_t>("sm.vacuum.timestamp_end", ×tamp_end, &found));
625 assert(found);
626
627 if (mode == nullptr)
628 return logger_->status(Status::StorageManagerError(
629 "Cannot vacuum array; Vacuum mode cannot be null"));
630 else if (std::string(mode) == "fragments")
631 RETURN_NOT_OK(
632 array_vacuum_fragments(array_name, timestamp_start, timestamp_end));
633 else if (std::string(mode) == "fragment_meta")
634 RETURN_NOT_OK(array_vacuum_fragment_meta(array_name));
635 else if (std::string(mode) == "array_meta")
636 RETURN_NOT_OK(
637 array_vacuum_array_meta(array_name, timestamp_start, timestamp_end));
638 else
639 return logger_->status(Status::StorageManagerError(
640 "Cannot vacuum array; Invalid vacuum mode"));
641
642 return Status::Ok();
643 }
644
array_vacuum_fragments(const char * array_name,uint64_t timestamp_start,uint64_t timestamp_end)645 Status StorageManager::array_vacuum_fragments(
646 const char* array_name, uint64_t timestamp_start, uint64_t timestamp_end) {
647 if (array_name == nullptr)
648 return logger_->status(Status::StorageManagerError(
649 "Cannot vacuum fragments; Array name cannot be null"));
650
651 // Get all URIs in the array directory
652 URI array_uri(array_name);
653 std::vector<URI> uris;
654 RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
655
656 // Get URIs to be vacuumed
657 std::vector<URI> to_vacuum, vac_uris;
658 RETURN_NOT_OK(get_uris_to_vacuum(
659 uris, timestamp_start, timestamp_end, &to_vacuum, &vac_uris));
660
661 // Delete the ok files
662 RETURN_NOT_OK(array_xlock(array_uri));
663 auto status =
664 parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
665 auto uri = URI(to_vacuum[i].to_string() + constants::ok_file_suffix);
666 RETURN_NOT_OK(vfs_->remove_file(uri));
667
668 return Status::Ok();
669 });
670 RETURN_NOT_OK_ELSE(status, array_xunlock(array_uri));
671 RETURN_NOT_OK(array_xunlock(array_uri));
672
673 // Delete fragment directories
674 status = parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
675 RETURN_NOT_OK(vfs_->remove_dir(to_vacuum[i]));
676
677 return Status::Ok();
678 });
679 RETURN_NOT_OK(status);
680
681 // Delete vacuum files
682 status = parallel_for(compute_tp_, 0, vac_uris.size(), [&, this](size_t i) {
683 RETURN_NOT_OK(vfs_->remove_file(vac_uris[i]));
684 return Status::Ok();
685 });
686 RETURN_NOT_OK(status);
687
688 return Status::Ok();
689 }
690
array_vacuum_fragment_meta(const char * array_name)691 Status StorageManager::array_vacuum_fragment_meta(const char* array_name) {
692 if (array_name == nullptr)
693 return logger_->status(Status::StorageManagerError(
694 "Cannot vacuum fragment metadata; Array name cannot be null"));
695
696 // Get the consolidated fragment metadata URIs to be deleted
697 // (all except the last one)
698 URI array_uri(array_name);
699 std::vector<URI> uris, to_vacuum;
700 URI last;
701 RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
702 RETURN_NOT_OK(get_consolidated_fragment_meta_uri(uris, &last));
703 to_vacuum.reserve(uris.size());
704 for (const auto& uri : uris) {
705 if (utils::parse::ends_with(uri.to_string(), constants::meta_file_suffix) &&
706 uri != last)
707 to_vacuum.emplace_back(uri);
708 }
709
710 // Vacuum after exclusively locking the array
711 RETURN_NOT_OK(array_xlock(array_uri));
712 auto status =
713 parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
714 RETURN_NOT_OK(vfs_->remove_file(to_vacuum[i]));
715 return Status::Ok();
716 });
717 RETURN_NOT_OK_ELSE(status, array_xunlock(array_uri));
718 RETURN_NOT_OK(array_xunlock(array_uri));
719
720 return Status::Ok();
721 }
722
array_vacuum_array_meta(const char * array_name,uint64_t timestamp_start,uint64_t timestamp_end)723 Status StorageManager::array_vacuum_array_meta(
724 const char* array_name, uint64_t timestamp_start, uint64_t timestamp_end) {
725 if (array_name == nullptr)
726 return logger_->status(Status::StorageManagerError(
727 "Cannot vacuum array metadata; Array name cannot be null"));
728
729 // Get all URIs in the array directory
730 URI array_uri(array_name);
731 auto meta_uri = array_uri.join_path(constants::array_metadata_folder_name);
732 std::vector<URI> uris;
733 RETURN_NOT_OK(vfs_->ls(meta_uri.add_trailing_slash(), &uris));
734
735 // Get URIs to be vacuumed
736 std::vector<URI> to_vacuum, vac_uris;
737 RETURN_NOT_OK(get_uris_to_vacuum(
738 uris, timestamp_start, timestamp_end, &to_vacuum, &vac_uris));
739
740 // Delete the array metadata files
741 RETURN_NOT_OK(array_xlock(array_uri));
742 auto status =
743 parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
744 RETURN_NOT_OK(vfs_->remove_file(to_vacuum[i]));
745
746 return Status::Ok();
747 });
748 RETURN_NOT_OK_ELSE(status, array_xunlock(array_uri));
749 RETURN_NOT_OK(array_xunlock(array_uri));
750
751 // Delete vacuum files
752 status = parallel_for(compute_tp_, 0, vac_uris.size(), [&, this](size_t i) {
753 RETURN_NOT_OK(vfs_->remove_file(vac_uris[i]));
754 return Status::Ok();
755 });
756 RETURN_NOT_OK(status);
757
758 return Status::Ok();
759 }
760
array_metadata_consolidate(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length,const Config * config)761 Status StorageManager::array_metadata_consolidate(
762 const char* array_name,
763 EncryptionType encryption_type,
764 const void* encryption_key,
765 uint32_t key_length,
766 const Config* config) {
767 // Check array URI
768 URI array_uri(array_name);
769 if (array_uri.is_invalid()) {
770 return logger_->status(Status::StorageManagerError(
771 "Cannot consolidate array metadata; Invalid URI"));
772 }
773 // Check if array exists
774 ObjectType obj_type;
775 RETURN_NOT_OK(object_type(array_uri, &obj_type));
776
777 if (obj_type != ObjectType::ARRAY) {
778 return logger_->status(Status::StorageManagerError(
779 "Cannot consolidate array metadata; Array does not exist"));
780 }
781
782 // If 'config' is unset, use the 'config_' that was set during initialization
783 // of this StorageManager instance.
784 if (!config) {
785 config = &config_;
786 }
787
788 // Get encryption key from config
789 std::string encryption_key_from_cfg;
790 if (!encryption_key) {
791 bool found = false;
792 encryption_key_from_cfg = config->get("sm.encryption_key", &found);
793 assert(found);
794 }
795
796 if (!encryption_key_from_cfg.empty()) {
797 encryption_key = encryption_key_from_cfg.c_str();
798 std::string encryption_type_from_cfg;
799 bool found = false;
800 encryption_type_from_cfg = config->get("sm.encryption_type", &found);
801 assert(found);
802 auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
803 RETURN_NOT_OK(st);
804 encryption_type = et.value();
805
806 if (EncryptionKey::is_valid_key_length(
807 encryption_type,
808 static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
809 const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
810 if (unit_test_cfg.array_encryption_key_length.is_set()) {
811 key_length = unit_test_cfg.array_encryption_key_length.get();
812 } else {
813 key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
814 }
815 } else {
816 encryption_key = nullptr;
817 key_length = 0;
818 }
819 }
820
821 // Consolidate
822 Consolidator consolidator(this);
823 return consolidator.consolidate_array_meta(
824 array_name, encryption_type, encryption_key, key_length);
825 }
826
array_create(const URI & array_uri,ArraySchema * array_schema,const EncryptionKey & encryption_key)827 Status StorageManager::array_create(
828 const URI& array_uri,
829 ArraySchema* array_schema,
830 const EncryptionKey& encryption_key) {
831 // Check array schema
832 if (array_schema == nullptr) {
833 return logger_->status(
834 Status::StorageManagerError("Cannot create array; Empty array schema"));
835 }
836
837 // Check if array exists
838 bool exists = false;
839 RETURN_NOT_OK(is_array(array_uri, &exists));
840 if (exists)
841 return logger_->status(Status::StorageManagerError(
842 std::string("Cannot create array; Array '") + array_uri.c_str() +
843 "' already exists"));
844
845 std::lock_guard<std::mutex> lock{object_create_mtx_};
846 array_schema->set_array_uri(array_uri);
847 RETURN_NOT_OK(array_schema->generate_uri());
848 RETURN_NOT_OK(array_schema->check());
849
850 // Create array directory
851 RETURN_NOT_OK(vfs_->create_dir(array_uri));
852
853 // Create array schema directory
854 URI array_schema_folder_uri =
855 array_uri.join_path(constants::array_schema_folder_name);
856 RETURN_NOT_OK(vfs_->create_dir(array_schema_folder_uri));
857
858 // Create array metadata directory
859 URI array_metadata_uri =
860 array_uri.join_path(constants::array_metadata_folder_name);
861 RETURN_NOT_OK(vfs_->create_dir(array_metadata_uri));
862 Status st;
863
864 // Get encryption key from config
865 if (encryption_key.encryption_type() == EncryptionType::NO_ENCRYPTION) {
866 bool found = false;
867 std::string encryption_key_from_cfg =
868 config_.get("sm.encryption_key", &found);
869 assert(found);
870 std::string encryption_type_from_cfg =
871 config_.get("sm.encryption_type", &found);
872 assert(found);
873 auto [st, etc] = encryption_type_enum(encryption_type_from_cfg);
874 RETURN_NOT_OK(st);
875 EncryptionType encryption_type_cfg = etc.value();
876
877 EncryptionKey encryption_key_cfg;
878 if (encryption_key_from_cfg.empty()) {
879 RETURN_NOT_OK(
880 encryption_key_cfg.set_key(encryption_type_cfg, nullptr, 0));
881 } else {
882 uint32_t key_length = 0;
883 if (EncryptionKey::is_valid_key_length(
884 encryption_type_cfg,
885 static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
886 const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
887 if (unit_test_cfg.array_encryption_key_length.is_set()) {
888 key_length = unit_test_cfg.array_encryption_key_length.get();
889 } else {
890 key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
891 }
892 }
893 RETURN_NOT_OK(encryption_key_cfg.set_key(
894 encryption_type_cfg,
895 (const void*)encryption_key_from_cfg.c_str(),
896 key_length));
897 }
898 st = store_array_schema(array_schema, encryption_key_cfg);
899 } else {
900 st = store_array_schema(array_schema, encryption_key);
901 }
902
903 // Store array schema
904 if (!st.ok()) {
905 vfs_->remove_dir(array_uri);
906 return st;
907 }
908
909 // Create array and array metadata filelocks
910 URI array_filelock_uri = array_uri.join_path(constants::filelock_name);
911 st = vfs_->touch(array_filelock_uri);
912 if (!st.ok()) {
913 vfs_->remove_dir(array_uri);
914 return st;
915 }
916
917 return Status::Ok();
918 }
919
array_evolve_schema(const URI & array_uri,ArraySchemaEvolution * schema_evolution,const EncryptionKey & encryption_key)920 Status StorageManager::array_evolve_schema(
921 const URI& array_uri,
922 ArraySchemaEvolution* schema_evolution,
923 const EncryptionKey& encryption_key) {
924 // Check array schema
925 if (schema_evolution == nullptr) {
926 return logger_->status(Status::StorageManagerError(
927 "Cannot evolve array; Empty schema evolution"));
928 }
929
930 if (array_uri.is_tiledb()) {
931 return rest_client_->post_array_schema_evolution_to_rest(
932 array_uri, schema_evolution);
933 }
934
935 // Check if array exists
936 bool exists = false;
937 RETURN_NOT_OK(is_array(array_uri, &exists));
938 if (!exists)
939 return logger_->status(Status::StorageManagerError(
940 std::string("Cannot evolve array; Array '") + array_uri.c_str() +
941 "' not exists"));
942
943 ArraySchema* array_schema = (ArraySchema*)nullptr;
944 RETURN_NOT_OK(load_array_schema(array_uri, encryption_key, &array_schema));
945
946 // Evolve schema
947 ArraySchema* array_schema_evolved = (ArraySchema*)nullptr;
948 RETURN_NOT_OK(
949 schema_evolution->evolve_schema(array_schema, &array_schema_evolved));
950
951 Status st = store_array_schema(array_schema_evolved, encryption_key);
952 if (!st.ok()) {
953 tdb_delete(array_schema_evolved);
954 logger_->status(st);
955 return logger_->status(Status::StorageManagerError(
956 "Cannot evovle schema; Not able to store evolved array schema."));
957 }
958
959 tdb_delete(array_schema);
960 array_schema = nullptr;
961 tdb_delete(array_schema_evolved);
962 array_schema_evolved = nullptr;
963
964 return Status::Ok();
965 }
966
array_upgrade_version(const URI & array_uri,const Config * config)967 Status StorageManager::array_upgrade_version(
968 const URI& array_uri, const Config* config) {
969 // Check if array exists
970 bool exists = false;
971 RETURN_NOT_OK(is_array(array_uri, &exists));
972 if (!exists)
973 return logger_->status(Status::StorageManagerError(
974 std::string("Cannot upgrade array; Array '") + array_uri.c_str() +
975 "' does not exist"));
976
977 // If 'config' is unset, use the 'config_' that was set during initialization
978 // of this StorageManager instance.
979 if (!config) {
980 config = &config_;
981 }
982
983 // Get encryption key from config
984 bool found = false;
985 std::string encryption_key_from_cfg =
986 config_.get("sm.encryption_key", &found);
987 assert(found);
988 std::string encryption_type_from_cfg =
989 config_.get("sm.encryption_type", &found);
990 assert(found);
991 auto [st, etc] = encryption_type_enum(encryption_type_from_cfg);
992 RETURN_NOT_OK(st);
993 EncryptionType encryption_type_cfg = etc.value();
994
995 EncryptionKey encryption_key_cfg;
996 if (encryption_key_from_cfg.empty()) {
997 RETURN_NOT_OK(encryption_key_cfg.set_key(encryption_type_cfg, nullptr, 0));
998 } else {
999 uint32_t key_length = 0;
1000 if (EncryptionKey::is_valid_key_length(
1001 encryption_type_cfg,
1002 static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
1003 const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
1004 if (unit_test_cfg.array_encryption_key_length.is_set()) {
1005 key_length = unit_test_cfg.array_encryption_key_length.get();
1006 } else {
1007 key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
1008 }
1009 }
1010 RETURN_NOT_OK(encryption_key_cfg.set_key(
1011 encryption_type_cfg,
1012 (const void*)encryption_key_from_cfg.c_str(),
1013 key_length));
1014 }
1015
1016 ArraySchema* array_schema = (ArraySchema*)nullptr;
1017 RETURN_NOT_OK(
1018 load_array_schema(array_uri, encryption_key_cfg, &array_schema));
1019
1020 if (array_schema->version() < constants::format_version) {
1021 Status st = array_schema->generate_uri();
1022 if (!st.ok()) {
1023 logger_->status(st);
1024 // Clean up
1025 tdb_delete(array_schema);
1026 return st;
1027 }
1028 array_schema->set_version(constants::format_version);
1029
1030 // Create array schema directory if necessary
1031 URI array_schema_folder_uri =
1032 array_uri.join_path(constants::array_schema_folder_name);
1033 st = vfs_->create_dir(array_schema_folder_uri);
1034 if (!st.ok()) {
1035 logger_->status(st);
1036 // Clean up
1037 tdb_delete(array_schema);
1038 return st;
1039 }
1040
1041 st = store_array_schema(array_schema, encryption_key_cfg);
1042 if (!st.ok()) {
1043 logger_->status(st);
1044 // Clean up
1045 tdb_delete(array_schema);
1046 return st;
1047 }
1048 }
1049
1050 // Clean up
1051 tdb_delete(array_schema);
1052
1053 return Status::Ok();
1054 }
1055
array_memory_tracker(const URI & array_uri,bool top_level)1056 OpenArrayMemoryTracker* StorageManager::array_memory_tracker(
1057 const URI& array_uri, bool top_level) {
1058 // Lock mutex
1059 std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
1060
1061 // Find the open array to retrieve the memory tracker.
1062 auto it = open_arrays_for_reads_.find(array_uri.to_string());
1063 if (it == open_arrays_for_reads_.end()) {
1064 if (top_level) {
1065 // TODO remove this work around once VCF runs on only context.
1066 // The memory tracker could live on another context, try to find it.
1067 auto& global_state = global_state::GlobalState::GetGlobalState();
1068 return global_state.array_memory_tracker(array_uri, this);
1069 } else {
1070 return nullptr;
1071 }
1072 }
1073
1074 return it->second->memory_tracker();
1075 }
1076
array_get_non_empty_domain(Array * array,NDRange * domain,bool * is_empty)1077 Status StorageManager::array_get_non_empty_domain(
1078 Array* array, NDRange* domain, bool* is_empty) {
1079 if (domain == nullptr)
1080 return logger_->status(Status::StorageManagerError(
1081 "Cannot get non-empty domain; Domain object is null"));
1082
1083 if (array == nullptr)
1084 return logger_->status(Status::StorageManagerError(
1085 "Cannot get non-empty domain; Array object is null"));
1086
1087 if (!array->is_remote() &&
1088 open_arrays_for_reads_.find(array->array_uri().to_string()) ==
1089 open_arrays_for_reads_.end())
1090 return logger_->status(Status::StorageManagerError(
1091 "Cannot get non-empty domain; Array not opened for reads"));
1092
1093 *domain = array->non_empty_domain();
1094 *is_empty = domain->empty();
1095
1096 return Status::Ok();
1097 }
1098
array_get_non_empty_domain(Array * array,void * domain,bool * is_empty)1099 Status StorageManager::array_get_non_empty_domain(
1100 Array* array, void* domain, bool* is_empty) {
1101 if (array == nullptr)
1102 return logger_->status(Status::StorageManagerError(
1103 "Cannot get non-empty domain; Array object is null"));
1104
1105 if (!array->array_schema_latest()->domain()->all_dims_same_type())
1106 return logger_->status(Status::StorageManagerError(
1107 "Cannot get non-empty domain; Function non-applicable to arrays with "
1108 "heterogenous dimensions"));
1109
1110 if (!array->array_schema_latest()->domain()->all_dims_fixed())
1111 return logger_->status(Status::StorageManagerError(
1112 "Cannot get non-empty domain; Function non-applicable to arrays with "
1113 "variable-sized dimensions"));
1114
1115 NDRange dom;
1116 RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1117 if (*is_empty)
1118 return Status::Ok();
1119
1120 auto array_schema = array->array_schema_latest();
1121 auto dim_num = array_schema->dim_num();
1122 auto domain_c = (unsigned char*)domain;
1123 uint64_t offset = 0;
1124 for (unsigned d = 0; d < dim_num; ++d) {
1125 std::memcpy(&domain_c[offset], dom[d].data(), dom[d].size());
1126 offset += dom[d].size();
1127 }
1128
1129 return Status::Ok();
1130 }
1131
array_get_non_empty_domain_from_index(Array * array,unsigned idx,void * domain,bool * is_empty)1132 Status StorageManager::array_get_non_empty_domain_from_index(
1133 Array* array, unsigned idx, void* domain, bool* is_empty) {
1134 // For easy reference
1135 auto array_schema = array->array_schema_latest();
1136 auto array_domain = array_schema->domain();
1137
1138 // Sanity checks
1139 if (idx >= array_schema->dim_num())
1140 return logger_->status(Status::StorageManagerError(
1141 "Cannot get non-empty domain; Invalid dimension index"));
1142 if (array_domain->dimension(idx)->var_size()) {
1143 std::string errmsg = "Cannot get non-empty domain; Dimension '";
1144 errmsg += array_domain->dimension(idx)->name();
1145 errmsg += "' is variable-sized";
1146 return logger_->status(Status::StorageManagerError(errmsg));
1147 }
1148
1149 NDRange dom;
1150 RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1151 if (*is_empty)
1152 return Status::Ok();
1153
1154 std::memcpy(domain, dom[idx].data(), dom[idx].size());
1155 return Status::Ok();
1156 }
1157
array_get_non_empty_domain_from_name(Array * array,const char * name,void * domain,bool * is_empty)1158 Status StorageManager::array_get_non_empty_domain_from_name(
1159 Array* array, const char* name, void* domain, bool* is_empty) {
1160 // Sanity check
1161 if (name == nullptr)
1162 return logger_->status(Status::StorageManagerError(
1163 "Cannot get non-empty domain; Invalid dimension name"));
1164
1165 NDRange dom;
1166 RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1167
1168 auto array_schema = array->array_schema_latest();
1169 auto array_domain = array_schema->domain();
1170 auto dim_num = array_schema->dim_num();
1171 for (unsigned d = 0; d < dim_num; ++d) {
1172 auto dim_name = array_schema->dimension(d)->name();
1173 if (name == dim_name) {
1174 // Sanity check
1175 if (array_domain->dimension(d)->var_size()) {
1176 std::string errmsg = "Cannot get non-empty domain; Dimension '";
1177 errmsg += dim_name + "' is variable-sized";
1178 return logger_->status(Status::StorageManagerError(errmsg));
1179 }
1180
1181 if (!*is_empty)
1182 std::memcpy(domain, dom[d].data(), dom[d].size());
1183 return Status::Ok();
1184 }
1185 }
1186
1187 return logger_->status(Status::StorageManagerError(
1188 std::string("Cannot get non-empty domain; Dimension name '") + name +
1189 "' does not exist"));
1190 }
1191
array_get_non_empty_domain_var_size_from_index(Array * array,unsigned idx,uint64_t * start_size,uint64_t * end_size,bool * is_empty)1192 Status StorageManager::array_get_non_empty_domain_var_size_from_index(
1193 Array* array,
1194 unsigned idx,
1195 uint64_t* start_size,
1196 uint64_t* end_size,
1197 bool* is_empty) {
1198 // For easy reference
1199 auto array_schema = array->array_schema_latest();
1200 auto array_domain = array_schema->domain();
1201
1202 // Sanity checks
1203 if (idx >= array_schema->dim_num())
1204 return logger_->status(Status::StorageManagerError(
1205 "Cannot get non-empty domain; Invalid dimension index"));
1206 if (!array_domain->dimension(idx)->var_size()) {
1207 std::string errmsg = "Cannot get non-empty domain; Dimension '";
1208 errmsg += array_domain->dimension(idx)->name();
1209 errmsg += "' is fixed-sized";
1210 return logger_->status(Status::StorageManagerError(errmsg));
1211 }
1212
1213 NDRange dom;
1214 RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1215 if (*is_empty) {
1216 *start_size = 0;
1217 *end_size = 0;
1218 return Status::Ok();
1219 }
1220
1221 *start_size = dom[idx].start_size();
1222 *end_size = dom[idx].end_size();
1223
1224 return Status::Ok();
1225 }
1226
array_get_non_empty_domain_var_size_from_name(Array * array,const char * name,uint64_t * start_size,uint64_t * end_size,bool * is_empty)1227 Status StorageManager::array_get_non_empty_domain_var_size_from_name(
1228 Array* array,
1229 const char* name,
1230 uint64_t* start_size,
1231 uint64_t* end_size,
1232 bool* is_empty) {
1233 // Sanity check
1234 if (name == nullptr)
1235 return logger_->status(Status::StorageManagerError(
1236 "Cannot get non-empty domain; Invalid dimension name"));
1237
1238 NDRange dom;
1239 RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1240
1241 auto array_schema = array->array_schema_latest();
1242 auto array_domain = array_schema->domain();
1243 auto dim_num = array_schema->dim_num();
1244 for (unsigned d = 0; d < dim_num; ++d) {
1245 auto dim_name = array_schema->dimension(d)->name();
1246 if (name == dim_name) {
1247 // Sanity check
1248 if (!array_domain->dimension(d)->var_size()) {
1249 std::string errmsg = "Cannot get non-empty domain; Dimension '";
1250 errmsg += dim_name + "' is fixed-sized";
1251 return logger_->status(Status::StorageManagerError(errmsg));
1252 }
1253
1254 if (*is_empty) {
1255 *start_size = 0;
1256 *end_size = 0;
1257 } else {
1258 *start_size = dom[d].start_size();
1259 *end_size = dom[d].end_size();
1260 }
1261
1262 return Status::Ok();
1263 }
1264 }
1265
1266 return logger_->status(Status::StorageManagerError(
1267 std::string("Cannot get non-empty domain; Dimension name '") + name +
1268 "' does not exist"));
1269 }
1270
array_get_non_empty_domain_var_from_index(Array * array,unsigned idx,void * start,void * end,bool * is_empty)1271 Status StorageManager::array_get_non_empty_domain_var_from_index(
1272 Array* array, unsigned idx, void* start, void* end, bool* is_empty) {
1273 // For easy reference
1274 auto array_schema = array->array_schema_latest();
1275 auto array_domain = array_schema->domain();
1276
1277 // Sanity checks
1278 if (idx >= array_schema->dim_num())
1279 return logger_->status(Status::StorageManagerError(
1280 "Cannot get non-empty domain; Invalid dimension index"));
1281 if (!array_domain->dimension(idx)->var_size()) {
1282 std::string errmsg = "Cannot get non-empty domain; Dimension '";
1283 errmsg += array_domain->dimension(idx)->name();
1284 errmsg += "' is fixed-sized";
1285 return logger_->status(Status::StorageManagerError(errmsg));
1286 }
1287
1288 NDRange dom;
1289 RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1290
1291 if (*is_empty)
1292 return Status::Ok();
1293
1294 std::memcpy(start, dom[idx].start(), dom[idx].start_size());
1295 std::memcpy(end, dom[idx].end(), dom[idx].end_size());
1296
1297 return Status::Ok();
1298 }
1299
array_get_non_empty_domain_var_from_name(Array * array,const char * name,void * start,void * end,bool * is_empty)1300 Status StorageManager::array_get_non_empty_domain_var_from_name(
1301 Array* array, const char* name, void* start, void* end, bool* is_empty) {
1302 // Sanity check
1303 if (name == nullptr)
1304 return logger_->status(Status::StorageManagerError(
1305 "Cannot get non-empty domain; Invalid dimension name"));
1306
1307 NDRange dom;
1308 RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1309
1310 auto array_schema = array->array_schema_latest();
1311 auto array_domain = array_schema->domain();
1312 auto dim_num = array_schema->dim_num();
1313 for (unsigned d = 0; d < dim_num; ++d) {
1314 auto dim_name = array_schema->dimension(d)->name();
1315 if (name == dim_name) {
1316 // Sanity check
1317 if (!array_domain->dimension(d)->var_size()) {
1318 std::string errmsg = "Cannot get non-empty domain; Dimension '";
1319 errmsg += dim_name + "' is fixed-sized";
1320 return logger_->status(Status::StorageManagerError(errmsg));
1321 }
1322
1323 if (!*is_empty) {
1324 std::memcpy(start, dom[d].start(), dom[d].start_size());
1325 std::memcpy(end, dom[d].end(), dom[d].end_size());
1326 }
1327
1328 return Status::Ok();
1329 }
1330 }
1331
1332 return logger_->status(Status::StorageManagerError(
1333 std::string("Cannot get non-empty domain; Dimension name '") + name +
1334 "' does not exist"));
1335 }
1336
array_get_encryption(const std::string & array_uri,EncryptionType * encryption_type)1337 Status StorageManager::array_get_encryption(
1338 const std::string& array_uri, EncryptionType* encryption_type) {
1339 URI uri(array_uri);
1340
1341 if (uri.is_invalid())
1342 return logger_->status(Status::StorageManagerError(
1343 "Cannot get array encryption; Invalid array URI"));
1344
1345 URI schema_uri;
1346 RETURN_NOT_OK(get_latest_array_schema_uri(uri, &schema_uri));
1347
1348 // Read tile header.
1349 GenericTileIO::GenericTileHeader header;
1350 RETURN_NOT_OK(
1351 GenericTileIO::read_generic_tile_header(this, schema_uri, 0, &header));
1352 *encryption_type = static_cast<EncryptionType>(header.encryption_type);
1353
1354 return Status::Ok();
1355 }
1356
array_xlock(const URI & array_uri)1357 Status StorageManager::array_xlock(const URI& array_uri) {
1358 // Get exclusive lock for threads
1359 xlock_mtx_.lock();
1360
1361 // Wait until the array is closed for reads
1362 std::unique_lock<std::mutex> lk(open_array_for_reads_mtx_);
1363 xlock_cv_.wait(lk, [this, array_uri] {
1364 return open_arrays_for_reads_.find(array_uri.to_string()) ==
1365 open_arrays_for_reads_.end();
1366 });
1367
1368 // Get exclusive lock for processes through a filelock
1369 filelock_t filelock = INVALID_FILELOCK;
1370 auto lock_uri = array_uri.join_path(constants::filelock_name);
1371 RETURN_NOT_OK_ELSE(
1372 vfs_->filelock_lock(lock_uri, &filelock, false), xlock_mtx_.unlock());
1373 xfilelocks_[array_uri.to_string()] = filelock;
1374
1375 return Status::Ok();
1376 }
1377
array_xunlock(const URI & array_uri)1378 Status StorageManager::array_xunlock(const URI& array_uri) {
1379 // Get filelock if it exists
1380 auto it = xfilelocks_.find(array_uri.to_string());
1381 if (it == xfilelocks_.end())
1382 return logger_->status(Status::StorageManagerError(
1383 "Cannot unlock array exclusive lock; Filelock not found"));
1384 auto filelock = it->second;
1385
1386 // Release exclusive lock for processes through the filelock
1387 auto lock_uri = array_uri.join_path(constants::filelock_name);
1388 if (filelock != INVALID_FILELOCK)
1389 RETURN_NOT_OK(vfs_->filelock_unlock(lock_uri));
1390 xfilelocks_.erase(it);
1391
1392 // Release exclusive lock for threads
1393 xlock_mtx_.unlock();
1394
1395 return Status::Ok();
1396 }
1397
async_push_query(Query * query)1398 Status StorageManager::async_push_query(Query* query) {
1399 cancelable_tasks_.execute(
1400 compute_tp_,
1401 [this, query]() {
1402 // Process query.
1403 Status st = query_submit(query);
1404 if (!st.ok())
1405 logger_->status(st);
1406 return st;
1407 },
1408 [query]() {
1409 // Task was cancelled. This is safe to perform in a separate thread,
1410 // as we are guaranteed by the thread pool not to have entered
1411 // query->process() yet.
1412 query->cancel();
1413 });
1414
1415 return Status::Ok();
1416 }
1417
cancel_all_tasks()1418 Status StorageManager::cancel_all_tasks() {
1419 // Check if there is already a "cancellation" in progress.
1420 bool handle_cancel = false;
1421 {
1422 std::unique_lock<std::mutex> lck(cancellation_in_progress_mtx_);
1423 if (!cancellation_in_progress_) {
1424 cancellation_in_progress_ = true;
1425 handle_cancel = true;
1426 }
1427 }
1428
1429 // Handle the cancellation.
1430 if (handle_cancel) {
1431 // Cancel any queued tasks.
1432 cancelable_tasks_.cancel_all_tasks();
1433
1434 // Only call VFS cancel if the object has been constructed
1435 if (vfs_ != nullptr)
1436 vfs_->cancel_all_tasks();
1437
1438 // Wait for in-progress queries to finish.
1439 wait_for_zero_in_progress();
1440
1441 // Reset the cancellation flag.
1442 std::unique_lock<std::mutex> lck(cancellation_in_progress_mtx_);
1443 cancellation_in_progress_ = false;
1444 }
1445
1446 return Status::Ok();
1447 }
1448
cancellation_in_progress()1449 bool StorageManager::cancellation_in_progress() {
1450 std::unique_lock<std::mutex> lck(cancellation_in_progress_mtx_);
1451 return cancellation_in_progress_;
1452 }
1453
config() const1454 const Config& StorageManager::config() const {
1455 return config_;
1456 }
1457
create_dir(const URI & uri)1458 Status StorageManager::create_dir(const URI& uri) {
1459 return vfs_->create_dir(uri);
1460 }
1461
is_dir(const URI & uri,bool * is_dir) const1462 Status StorageManager::is_dir(const URI& uri, bool* is_dir) const {
1463 return vfs_->is_dir(uri, is_dir);
1464 }
1465
touch(const URI & uri)1466 Status StorageManager::touch(const URI& uri) {
1467 return vfs_->touch(uri);
1468 }
1469
decrement_in_progress()1470 void StorageManager::decrement_in_progress() {
1471 std::unique_lock<std::mutex> lck(queries_in_progress_mtx_);
1472 queries_in_progress_--;
1473 queries_in_progress_cv_.notify_all();
1474 }
1475
object_remove(const char * path) const1476 Status StorageManager::object_remove(const char* path) const {
1477 auto uri = URI(path);
1478 if (uri.is_invalid())
1479 return logger_->status(Status::StorageManagerError(
1480 std::string("Cannot remove object '") + path + "'; Invalid URI"));
1481
1482 ObjectType obj_type;
1483 RETURN_NOT_OK(object_type(uri, &obj_type));
1484 if (obj_type == ObjectType::INVALID)
1485 return logger_->status(Status::StorageManagerError(
1486 std::string("Cannot remove object '") + path +
1487 "'; Invalid TileDB object"));
1488
1489 return vfs_->remove_dir(uri);
1490 }
1491
object_move(const char * old_path,const char * new_path) const1492 Status StorageManager::object_move(
1493 const char* old_path, const char* new_path) const {
1494 auto old_uri = URI(old_path);
1495 if (old_uri.is_invalid())
1496 return logger_->status(Status::StorageManagerError(
1497 std::string("Cannot move object '") + old_path + "'; Invalid URI"));
1498
1499 auto new_uri = URI(new_path);
1500 if (new_uri.is_invalid())
1501 return logger_->status(Status::StorageManagerError(
1502 std::string("Cannot move object to '") + new_path + "'; Invalid URI"));
1503
1504 ObjectType obj_type;
1505 RETURN_NOT_OK(object_type(old_uri, &obj_type));
1506 if (obj_type == ObjectType::INVALID)
1507 return logger_->status(Status::StorageManagerError(
1508 std::string("Cannot move object '") + old_path +
1509 "'; Invalid TileDB object"));
1510
1511 return vfs_->move_dir(old_uri, new_uri);
1512 }
1513
get_fragment_info(const Array & array,uint64_t timestamp_start,uint64_t timestamp_end,FragmentInfo * fragment_info,bool get_to_vacuum)1514 Status StorageManager::get_fragment_info(
1515 const Array& array,
1516 uint64_t timestamp_start,
1517 uint64_t timestamp_end,
1518 FragmentInfo* fragment_info,
1519 bool get_to_vacuum) {
1520 fragment_info->clear();
1521
1522 // Open array for reading
1523 auto array_schema = array.array_schema_latest();
1524 auto array_type = array_schema->array_type();
1525
1526 fragment_info->set_dim_info(
1527 array_schema->dim_names(), array_schema->dim_types());
1528
1529 // Get encryption key from config
1530 auto&& [st, array_schema_opt, array_schemas_opt, fragment_metadata_opt] =
1531 array_reopen(
1532 array.array_uri(),
1533 *array.encryption_key(),
1534 array_type == ArrayType::SPARSE ? timestamp_start : 0,
1535 timestamp_end);
1536
1537 if (!st.ok())
1538 return st;
1539
1540 // Return if array is empty
1541 if (fragment_metadata_opt.value().empty())
1542 return Status::Ok();
1543
1544 const auto& fragment_metadata = fragment_metadata_opt.value();
1545
1546 std::vector<uint64_t> sizes(fragment_metadata.size());
1547
1548 RETURN_NOT_OK(parallel_for(
1549 this->compute_tp_,
1550 0,
1551 fragment_metadata.size(),
1552 [&fragment_metadata, &sizes](uint64_t i) {
1553 const auto meta = fragment_metadata[i];
1554
1555 // Get fragment size
1556 uint64_t size;
1557 RETURN_NOT_OK(meta->fragment_size(&size));
1558 sizes[i] = size;
1559
1560 return Status::Ok();
1561 }));
1562
1563 for (uint64_t i = 0; i < fragment_metadata.size(); i++) {
1564 const auto meta = fragment_metadata[i];
1565 const auto& non_empty_domain = meta->non_empty_domain();
1566
1567 if (meta->timestamp_range().first < timestamp_start) {
1568 fragment_info->expand_anterior_ndrange(
1569 meta->array_schema()->domain(), non_empty_domain);
1570 } else {
1571 const auto& uri = meta->fragment_uri();
1572 bool sparse = !meta->dense();
1573
1574 // compute expanded non-empty domain (only for dense fragments)
1575 auto expanded_non_empty_domain = non_empty_domain;
1576 if (!sparse)
1577 meta->array_schema()->domain()->expand_to_tiles(
1578 &expanded_non_empty_domain);
1579
1580 // Push new fragment info
1581 fragment_info->append(SingleFragmentInfo(
1582 uri,
1583 sparse,
1584 meta->timestamp_range(),
1585 sizes[i],
1586 non_empty_domain,
1587 expanded_non_empty_domain,
1588 meta));
1589 }
1590 }
1591
1592 // Optionally get the URIs to vacuum
1593 if (get_to_vacuum) {
1594 std::vector<URI> to_vacuum, vac_uris, fragment_uris;
1595 URI meta_uri;
1596 RETURN_NOT_OK(
1597 get_fragment_uris(array.array_uri(), &fragment_uris, &meta_uri));
1598 RETURN_NOT_OK(get_uris_to_vacuum(
1599 fragment_uris, timestamp_start, timestamp_end, &to_vacuum, &vac_uris));
1600 fragment_info->set_to_vacuum(to_vacuum);
1601 }
1602
1603 return Status::Ok();
1604 }
1605
get_fragment_uris(const URI & array_uri,std::vector<URI> * fragment_uris,URI * meta_uri) const1606 Status StorageManager::get_fragment_uris(
1607 const URI& array_uri,
1608 std::vector<URI>* fragment_uris,
1609 URI* meta_uri) const {
1610 auto timer_se = stats_->start_timer("read_get_fragment_uris");
1611 // Get all uris in the array directory
1612 std::vector<URI> uris;
1613 RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
1614
1615 // Get the fragments that have special "ok" URIs, which indicate
1616 // that fragments are "committed" for versions >= 5
1617 std::set<URI> ok_uris;
1618 for (size_t i = 0; i < uris.size(); ++i) {
1619 if (utils::parse::ends_with(
1620 uris[i].to_string(), constants::ok_file_suffix)) {
1621 auto name = uris[i].to_string();
1622 name = name.substr(0, name.size() - constants::ok_file_suffix.size());
1623 ok_uris.emplace(URI(name));
1624 }
1625 }
1626
1627 // Get only the committed fragment uris
1628 std::vector<int> is_fragment(uris.size(), 0);
1629 auto status = parallel_for(compute_tp_, 0, uris.size(), [&](size_t i) {
1630 if (utils::parse::starts_with(uris[i].last_path_part(), "."))
1631 return Status::Ok();
1632 RETURN_NOT_OK(this->is_fragment(uris[i], ok_uris, &is_fragment[i]));
1633 return Status::Ok();
1634 });
1635 RETURN_NOT_OK(status);
1636
1637 for (size_t i = 0; i < uris.size(); ++i) {
1638 if (is_fragment[i])
1639 fragment_uris->emplace_back(uris[i]);
1640 else if (this->is_vacuum_file(uris[i]))
1641 fragment_uris->emplace_back(uris[i]);
1642 }
1643
1644 // Get the latest consolidated fragment metadata URI
1645 RETURN_NOT_OK(get_consolidated_fragment_meta_uri(uris, meta_uri));
1646
1647 return Status::Ok();
1648 }
1649
tags() const1650 const std::unordered_map<std::string, std::string>& StorageManager::tags()
1651 const {
1652 return tags_;
1653 }
1654
get_fragment_info(const Array & array,const URI & fragment_uri,SingleFragmentInfo * fragment_info)1655 Status StorageManager::get_fragment_info(
1656 const Array& array,
1657 const URI& fragment_uri,
1658 SingleFragmentInfo* fragment_info) {
1659 // Get timestamp range
1660 std::pair<uint64_t, uint64_t> timestamp_range;
1661 RETURN_NOT_OK(
1662 utils::parse::get_timestamp_range(fragment_uri, ×tamp_range));
1663 uint32_t version;
1664 auto name = fragment_uri.remove_trailing_slash().last_path_part();
1665 RETURN_NOT_OK(utils::parse::get_fragment_name_version(name, &version));
1666
1667 // Check if fragment is sparse
1668 bool sparse = false;
1669 if (version == 1) { // This corresponds to format version <=2
1670 URI coords_uri =
1671 fragment_uri.join_path(constants::coords + constants::file_suffix);
1672 RETURN_NOT_OK(vfs_->is_file(coords_uri, &sparse));
1673 } else {
1674 // Do nothing. It does not matter what the `sparse` value
1675 // is, since the FragmentMetadata object will load the correct
1676 // value from the metadata file.
1677
1678 // Also `sparse` is updated below after loading the metadata
1679 }
1680
1681 // Get fragment non-empty domain
1682 auto meta = tdb_make_shared(
1683 FragmentMetadata,
1684 this,
1685 array.array_schema_latest(),
1686 fragment_uri,
1687 timestamp_range,
1688 !sparse);
1689 RETURN_NOT_OK(meta->load(
1690 *array.encryption_key(), nullptr, 0, array.array_schemas_all()));
1691
1692 // This is important for format version > 2
1693 sparse = !meta->dense();
1694
1695 // Get fragment size
1696 uint64_t size;
1697 RETURN_NOT_OK(meta->fragment_size(&size));
1698
1699 // Compute expanded non-empty domain only for dense fragments
1700 // Get non-empty domain, and compute expanded non-empty domain
1701 // (only for dense fragments)
1702 const auto& non_empty_domain = meta->non_empty_domain();
1703 auto expanded_non_empty_domain = non_empty_domain;
1704 if (!sparse)
1705 array.array_schema_latest()->domain()->expand_to_tiles(
1706 &expanded_non_empty_domain);
1707
1708 // Set fragment info
1709 *fragment_info = SingleFragmentInfo(
1710 fragment_uri,
1711 sparse,
1712 timestamp_range,
1713 size,
1714 non_empty_domain,
1715 expanded_non_empty_domain,
1716 meta);
1717
1718 return Status::Ok();
1719 }
1720
group_create(const std::string & group)1721 Status StorageManager::group_create(const std::string& group) {
1722 // Create group URI
1723 URI uri(group);
1724 if (uri.is_invalid())
1725 return logger_->status(Status::StorageManagerError(
1726 "Cannot create group '" + group + "'; Invalid group URI"));
1727
1728 // Check if group exists
1729 bool exists;
1730 RETURN_NOT_OK(is_group(uri, &exists));
1731 if (exists)
1732 return logger_->status(Status::StorageManagerError(
1733 std::string("Cannot create group; Group '") + uri.c_str() +
1734 "' already exists"));
1735
1736 std::lock_guard<std::mutex> lock{object_create_mtx_};
1737
1738 // Create group directory
1739 RETURN_NOT_OK(vfs_->create_dir(uri));
1740
1741 // Create group file
1742 URI group_filename = uri.join_path(constants::group_filename);
1743 Status st = vfs_->touch(group_filename);
1744 if (!st.ok()) {
1745 vfs_->remove_dir(uri);
1746 return st;
1747 }
1748 return st;
1749 }
1750
init(const Config * config)1751 Status StorageManager::init(const Config* config) {
1752 if (config != nullptr)
1753 config_ = *config;
1754
1755 // Get config params
1756 bool found = false;
1757 uint64_t tile_cache_size = 0;
1758 RETURN_NOT_OK(
1759 config_.get<uint64_t>("sm.tile_cache_size", &tile_cache_size, &found));
1760 assert(found);
1761
1762 tile_cache_ =
1763 tdb_unique_ptr<BufferLRUCache>(tdb_new(BufferLRUCache, tile_cache_size));
1764
1765 // GlobalState must be initialized before `vfs->init` because S3::init calls
1766 // GetGlobalState
1767 auto& global_state = global_state::GlobalState::GetGlobalState();
1768 RETURN_NOT_OK(global_state.init(config));
1769
1770 vfs_ = tdb_new(VFS);
1771 RETURN_NOT_OK(vfs_->init(stats_, compute_tp_, io_tp_, &config_, nullptr));
1772 #ifdef TILEDB_SERIALIZATION
1773 RETURN_NOT_OK(init_rest_client());
1774 #endif
1775
1776 RETURN_NOT_OK(set_default_tags());
1777
1778 global_state.register_storage_manager(this);
1779
1780 return Status::Ok();
1781 }
1782
compute_tp()1783 ThreadPool* StorageManager::compute_tp() {
1784 return compute_tp_;
1785 }
1786
io_tp()1787 ThreadPool* StorageManager::io_tp() {
1788 return io_tp_;
1789 }
1790
rest_client() const1791 RestClient* StorageManager::rest_client() const {
1792 return rest_client_.get();
1793 }
1794
increment_in_progress()1795 void StorageManager::increment_in_progress() {
1796 std::unique_lock<std::mutex> lck(queries_in_progress_mtx_);
1797 queries_in_progress_++;
1798 queries_in_progress_cv_.notify_all();
1799 }
1800
is_array(const URI & uri,bool * is_array) const1801 Status StorageManager::is_array(const URI& uri, bool* is_array) const {
1802 // Check if the schema directory exists or not
1803 bool is_dir = false;
1804 // Since is_dir could return NOT Ok status, we will not use RETURN_NOT_OK here
1805 Status st =
1806 vfs_->is_dir(uri.join_path(constants::array_schema_folder_name), &is_dir);
1807 if (st.ok() && is_dir) {
1808 *is_array = true;
1809 return Status::Ok();
1810 }
1811
1812 // If there is no schema directory, we check schema file
1813 RETURN_NOT_OK(
1814 vfs_->is_file(uri.join_path(constants::array_schema_filename), is_array));
1815 return Status::Ok();
1816 }
1817
is_file(const URI & uri,bool * is_file) const1818 Status StorageManager::is_file(const URI& uri, bool* is_file) const {
1819 RETURN_NOT_OK(vfs_->is_file(uri, is_file));
1820 return Status::Ok();
1821 }
1822
is_fragment(const URI & uri,const std::set<URI> & ok_uris,int * is_fragment) const1823 Status StorageManager::is_fragment(
1824 const URI& uri, const std::set<URI>& ok_uris, int* is_fragment) const {
1825 // If the URI name has a suffix, then it is not a fragment
1826 auto name = uri.remove_trailing_slash().last_path_part();
1827 if (name.find_first_of('.') != std::string::npos) {
1828 *is_fragment = 0;
1829 return Status::Ok();
1830 }
1831
1832 // Check set membership in ok_uris
1833 if (ok_uris.find(uri) != ok_uris.end()) {
1834 *is_fragment = 1;
1835 return Status::Ok();
1836 }
1837
1838 // If the format version is >= 5, then the above suffices to check if
1839 // the URI is indeed a fragment
1840 uint32_t version;
1841 RETURN_NOT_OK(utils::parse::get_fragment_version(name, &version));
1842 if (version != UINT32_MAX && version >= 5) {
1843 *is_fragment = false;
1844 return Status::Ok();
1845 }
1846
1847 // Versions < 5
1848 bool is_file;
1849 RETURN_NOT_OK(vfs_->is_file(
1850 uri.join_path(constants::fragment_metadata_filename), &is_file));
1851 *is_fragment = (int)is_file;
1852 return Status::Ok();
1853 }
1854
is_group(const URI & uri,bool * is_group) const1855 Status StorageManager::is_group(const URI& uri, bool* is_group) const {
1856 RETURN_NOT_OK(
1857 vfs_->is_file(uri.join_path(constants::group_filename), is_group));
1858 return Status::Ok();
1859 }
1860
is_vacuum_file(const URI & uri) const1861 bool StorageManager::is_vacuum_file(const URI& uri) const {
1862 // If the URI name has a suffix, then it is not a fragment
1863 if (utils::parse::ends_with(uri.to_string(), constants::vacuum_file_suffix))
1864 return true;
1865
1866 return false;
1867 }
1868
get_array_schema_uris(const URI & array_uri,std::vector<URI> * schema_uris) const1869 Status StorageManager::get_array_schema_uris(
1870 const URI& array_uri, std::vector<URI>* schema_uris) const {
1871 auto timer_se = stats_->start_timer("read_get_array_schema_uris");
1872
1873 schema_uris->clear();
1874 URI old_schema_uri = array_uri.join_path(constants::array_schema_filename);
1875 bool has_file = false;
1876 RETURN_NOT_OK(vfs_->is_file(old_schema_uri, &has_file));
1877 if (has_file) {
1878 schema_uris->push_back(old_schema_uri);
1879 }
1880
1881 URI schema_folder_uri =
1882 array_uri.join_path(constants::array_schema_folder_name);
1883 // Check if schema_folder_uri exists. For some file systems, such as win, ls
1884 // will return error if the folder does not exist.
1885 bool has_dir = false;
1886 RETURN_NOT_OK(vfs_->is_dir(schema_folder_uri, &has_dir));
1887 if (has_dir) {
1888 std::vector<URI> array_schema_uris;
1889 RETURN_NOT_OK(vfs_->ls(schema_folder_uri, &array_schema_uris));
1890 if (array_schema_uris.size() > 0) {
1891 schema_uris->reserve(schema_uris->size() + array_schema_uris.size());
1892 std::copy(
1893 array_schema_uris.begin(),
1894 array_schema_uris.end(),
1895 std::back_inserter(*schema_uris));
1896 }
1897 }
1898
1899 // Check if schema_uris is empty
1900 if (schema_uris->empty()) {
1901 return logger_->status(Status::StorageManagerError(
1902 "Can not get the array schemas; No array schemas found."));
1903 }
1904
1905 return Status::Ok();
1906 }
1907
get_latest_array_schema_uri(const URI & array_uri,URI * uri) const1908 Status StorageManager::get_latest_array_schema_uri(
1909 const URI& array_uri, URI* uri) const {
1910 auto timer_se = stats_->start_timer("read_get_latest_array_schema_uri");
1911
1912 std::vector<URI> schema_uris;
1913 RETURN_NOT_OK(get_array_schema_uris(array_uri, &schema_uris));
1914 if (schema_uris.size() == 0) {
1915 return logger_->status(Status::StorageManagerError(
1916 "Can not get the latest array schema; No array schemas found."));
1917 }
1918 *uri = schema_uris.back();
1919 if (uri->is_invalid()) {
1920 return logger_->status(
1921 Status::StorageManagerError("Could not find array schema URI"));
1922 }
1923
1924 return Status::Ok();
1925 }
1926
load_array_schema_from_uri(const URI & schema_uri,const EncryptionKey & encryption_key,ArraySchema ** array_schema)1927 Status StorageManager::load_array_schema_from_uri(
1928 const URI& schema_uri,
1929 const EncryptionKey& encryption_key,
1930 ArraySchema** array_schema) {
1931 auto timer_se = stats_->start_timer("read_load_array_schema_from_uri");
1932
1933 GenericTileIO tile_io(this, schema_uri);
1934 Tile* tile = nullptr;
1935
1936 // Get encryption key from config
1937 if (encryption_key.encryption_type() == EncryptionType::NO_ENCRYPTION) {
1938 bool found = false;
1939 std::string encryption_key_from_cfg =
1940 config_.get("sm.encryption_key", &found);
1941 assert(found);
1942 std::string encryption_type_from_cfg =
1943 config_.get("sm.encryption_type", &found);
1944 assert(found);
1945 auto [st, etc] = encryption_type_enum(encryption_type_from_cfg);
1946 RETURN_NOT_OK(st);
1947 EncryptionType encryption_type_cfg = etc.value();
1948
1949 EncryptionKey encryption_key_cfg;
1950 if (encryption_key_from_cfg.empty()) {
1951 RETURN_NOT_OK(
1952 encryption_key_cfg.set_key(encryption_type_cfg, nullptr, 0));
1953 } else {
1954 uint32_t key_length = 0;
1955 if (EncryptionKey::is_valid_key_length(
1956 encryption_type_cfg,
1957 static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
1958 const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
1959 if (unit_test_cfg.array_encryption_key_length.is_set()) {
1960 key_length = unit_test_cfg.array_encryption_key_length.get();
1961 } else {
1962 key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
1963 }
1964 }
1965 RETURN_NOT_OK(encryption_key_cfg.set_key(
1966 encryption_type_cfg,
1967 (const void*)encryption_key_from_cfg.c_str(),
1968 key_length));
1969 }
1970 RETURN_NOT_OK(tile_io.read_generic(&tile, 0, encryption_key_cfg, config_));
1971 } else {
1972 RETURN_NOT_OK(tile_io.read_generic(&tile, 0, encryption_key, config_));
1973 }
1974
1975 auto buffer = tile->buffer();
1976 Buffer buff;
1977 buff.realloc(buffer->size());
1978 buff.set_size(buffer->size());
1979 RETURN_NOT_OK_ELSE(buffer->read(buff.data(), buff.size()), tdb_delete(tile));
1980 tdb_delete(tile);
1981
1982 stats_->add_counter("read_array_schema_size", buff.size());
1983
1984 // Deserialize
1985 ConstBuffer cbuff(&buff);
1986 *array_schema = tdb_new(ArraySchema);
1987 Status st = (*array_schema)->deserialize(&cbuff);
1988 if (!st.ok()) {
1989 tdb_delete(*array_schema);
1990 *array_schema = nullptr;
1991 return st;
1992 }
1993 (*array_schema)->set_uri(schema_uri);
1994 return st;
1995 }
1996
load_array_schema(const URI & array_uri,const EncryptionKey & encryption_key,ArraySchema ** array_schema)1997 Status StorageManager::load_array_schema(
1998 const URI& array_uri,
1999 const EncryptionKey& encryption_key,
2000 ArraySchema** array_schema) {
2001 auto timer_se = stats_->start_timer("read_load_array_schema");
2002
2003 if (array_uri.is_invalid())
2004 return logger_->status(Status::StorageManagerError(
2005 "Cannot load array schema; Invalid array URI"));
2006
2007 URI schema_uri;
2008 RETURN_NOT_OK(get_latest_array_schema_uri(array_uri, &schema_uri));
2009
2010 RETURN_NOT_OK(
2011 load_array_schema_from_uri(schema_uri, encryption_key, array_schema));
2012 (*array_schema)->set_array_uri(array_uri);
2013 return Status::Ok();
2014 }
2015
2016 std::tuple<
2017 Status,
2018 std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>>
load_all_array_schemas(const URI & array_uri,const EncryptionKey & encryption_key)2019 StorageManager::load_all_array_schemas(
2020 const URI& array_uri, const EncryptionKey& encryption_key) {
2021 auto timer_se = stats_->start_timer("read_load_all_array_schemas");
2022
2023 if (array_uri.is_invalid())
2024 return {logger_->status(Status::StorageManagerError(
2025 "Cannot load all array schemas; Invalid array URI")),
2026 std::nullopt};
2027
2028 std::vector<URI> schema_uris;
2029 RETURN_NOT_OK_TUPLE(get_array_schema_uris(array_uri, &schema_uris));
2030 if (schema_uris.empty()) {
2031 return {
2032 logger_->status(Status::StorageManagerError(
2033 "Can not get the array schema vector; No array schemas found.")),
2034 std::nullopt};
2035 }
2036
2037 std::vector<ArraySchema*> schema_vector;
2038 auto schema_num = schema_uris.size();
2039 schema_vector.resize(schema_num);
2040
2041 auto status =
2042 parallel_for(compute_tp_, 0, schema_num, [&](size_t schema_ith) {
2043 auto& schema_uri = schema_uris[schema_ith];
2044 auto array_schema = (ArraySchema*)nullptr;
2045 RETURN_NOT_OK(load_array_schema_from_uri(
2046 schema_uri, encryption_key, &array_schema));
2047 schema_vector[schema_ith] = array_schema;
2048 return Status::Ok();
2049 });
2050 RETURN_NOT_OK_TUPLE(status);
2051
2052 std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>> array_schemas;
2053 for (const auto& array_schema : schema_vector) {
2054 array_schemas[array_schema->name()] =
2055 tdb_shared_ptr<ArraySchema>(array_schema);
2056 }
2057
2058 return {Status::Ok(), array_schemas};
2059 }
2060
load_array_metadata(const URI & array_uri,const EncryptionKey & encryption_key,uint64_t timestamp_start,uint64_t timestamp_end,Metadata * metadata)2061 Status StorageManager::load_array_metadata(
2062 const URI& array_uri,
2063 const EncryptionKey& encryption_key,
2064 uint64_t timestamp_start,
2065 uint64_t timestamp_end,
2066 Metadata* metadata) {
2067 auto timer_se = stats_->start_timer("read_load_array_meta");
2068
2069 OpenArray* open_array;
2070 // Lock mutex
2071 {
2072 std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
2073
2074 // Find the open array entry
2075 auto it = open_arrays_for_reads_.find(array_uri.to_string());
2076 assert(it != open_arrays_for_reads_.end());
2077 open_array = it->second;
2078
2079 // Lock the array
2080 open_array->mtx_lock();
2081 }
2082
2083 // Determine which array metadata to load
2084 std::vector<TimestampedURI> array_metadata_to_load;
2085 std::vector<URI> array_metadata_uris;
2086 RETURN_NOT_OK_ELSE(
2087 get_array_metadata_uris(array_uri, &array_metadata_uris),
2088 open_array->mtx_unlock());
2089 RETURN_NOT_OK_ELSE(
2090 get_sorted_uris(
2091 array_metadata_uris,
2092 &array_metadata_to_load,
2093 timestamp_start,
2094 timestamp_end),
2095 open_array->mtx_unlock());
2096
2097 // Get the array metadata
2098 RETURN_NOT_OK_ELSE(
2099 load_array_metadata(
2100 open_array, encryption_key, array_metadata_to_load, metadata),
2101 open_array->mtx_unlock());
2102
2103 // Unlock the array
2104 open_array->mtx_unlock();
2105
2106 return Status::Ok();
2107 }
2108
object_type(const URI & uri,ObjectType * type) const2109 Status StorageManager::object_type(const URI& uri, ObjectType* type) const {
2110 URI dir_uri = uri;
2111 if (uri.is_s3() || uri.is_azure() || uri.is_gcs()) {
2112 // Always add a trailing '/' in the S3/Azure/GCS case so that listing the
2113 // URI as a directory will work as expected. Listing a non-directory object
2114 // is not an error for S3/Azure/GCS.
2115 auto uri_str = uri.to_string();
2116 dir_uri =
2117 URI(utils::parse::ends_with(uri_str, "/") ? uri_str : (uri_str + "/"));
2118 } else {
2119 // For non public cloud backends, listing a non-directory is an error.
2120 bool is_dir = false;
2121 RETURN_NOT_OK(vfs_->is_dir(uri, &is_dir));
2122 if (!is_dir) {
2123 *type = ObjectType::INVALID;
2124 return Status::Ok();
2125 }
2126 }
2127
2128 std::vector<URI> child_uris;
2129 RETURN_NOT_OK(vfs_->ls(dir_uri, &child_uris));
2130
2131 for (const auto& child_uri : child_uris) {
2132 auto uri_str = child_uri.to_string();
2133 if (utils::parse::ends_with(uri_str, constants::group_filename)) {
2134 *type = ObjectType::GROUP;
2135 return Status::Ok();
2136 } else if (utils::parse::ends_with(
2137 uri_str, constants::array_schema_filename)) {
2138 *type = ObjectType::ARRAY;
2139 return Status::Ok();
2140 } else if (utils::parse::ends_with(
2141 uri_str, constants::array_schema_folder_name)) {
2142 *type = ObjectType::ARRAY;
2143 return Status::Ok();
2144 }
2145 }
2146
2147 *type = ObjectType::INVALID;
2148 return Status::Ok();
2149 }
2150
object_iter_begin(ObjectIter ** obj_iter,const char * path,WalkOrder order)2151 Status StorageManager::object_iter_begin(
2152 ObjectIter** obj_iter, const char* path, WalkOrder order) {
2153 // Sanity check
2154 URI path_uri(path);
2155 if (path_uri.is_invalid()) {
2156 return logger_->status(Status::StorageManagerError(
2157 "Cannot create object iterator; Invalid input path"));
2158 }
2159
2160 // Get all contents of path
2161 std::vector<URI> uris;
2162 RETURN_NOT_OK(vfs_->ls(path_uri, &uris));
2163
2164 // Create a new object iterator
2165 *obj_iter = tdb_new(ObjectIter);
2166 (*obj_iter)->order_ = order;
2167 (*obj_iter)->recursive_ = true;
2168
2169 // Include the uris that are TileDB objects in the iterator state
2170 ObjectType obj_type;
2171 for (auto& uri : uris) {
2172 RETURN_NOT_OK_ELSE(object_type(uri, &obj_type), tdb_delete(*obj_iter));
2173 if (obj_type != ObjectType::INVALID) {
2174 (*obj_iter)->objs_.push_back(uri);
2175 if (order == WalkOrder::POSTORDER)
2176 (*obj_iter)->expanded_.push_back(false);
2177 }
2178 }
2179
2180 return Status::Ok();
2181 }
2182
object_iter_begin(ObjectIter ** obj_iter,const char * path)2183 Status StorageManager::object_iter_begin(
2184 ObjectIter** obj_iter, const char* path) {
2185 // Sanity check
2186 URI path_uri(path);
2187 if (path_uri.is_invalid()) {
2188 return logger_->status(Status::StorageManagerError(
2189 "Cannot create object iterator; Invalid input path"));
2190 }
2191
2192 // Get all contents of path
2193 std::vector<URI> uris;
2194 RETURN_NOT_OK(vfs_->ls(path_uri, &uris));
2195
2196 // Create a new object iterator
2197 *obj_iter = tdb_new(ObjectIter);
2198 (*obj_iter)->order_ = WalkOrder::PREORDER;
2199 (*obj_iter)->recursive_ = false;
2200
2201 // Include the uris that are TileDB objects in the iterator state
2202 ObjectType obj_type;
2203 for (auto& uri : uris) {
2204 RETURN_NOT_OK(object_type(uri, &obj_type));
2205 if (obj_type != ObjectType::INVALID)
2206 (*obj_iter)->objs_.push_back(uri);
2207 }
2208
2209 return Status::Ok();
2210 }
2211
object_iter_free(ObjectIter * obj_iter)2212 void StorageManager::object_iter_free(ObjectIter* obj_iter) {
2213 tdb_delete(obj_iter);
2214 }
2215
object_iter_next(ObjectIter * obj_iter,const char ** path,ObjectType * type,bool * has_next)2216 Status StorageManager::object_iter_next(
2217 ObjectIter* obj_iter, const char** path, ObjectType* type, bool* has_next) {
2218 // Handle case there is no next
2219 if (obj_iter->objs_.empty()) {
2220 *has_next = false;
2221 return Status::Ok();
2222 }
2223
2224 // Retrieve next object
2225 switch (obj_iter->order_) {
2226 case WalkOrder::PREORDER:
2227 RETURN_NOT_OK(object_iter_next_preorder(obj_iter, path, type, has_next));
2228 break;
2229 case WalkOrder::POSTORDER:
2230 RETURN_NOT_OK(object_iter_next_postorder(obj_iter, path, type, has_next));
2231 break;
2232 }
2233
2234 return Status::Ok();
2235 }
2236
object_iter_next_postorder(ObjectIter * obj_iter,const char ** path,ObjectType * type,bool * has_next)2237 Status StorageManager::object_iter_next_postorder(
2238 ObjectIter* obj_iter, const char** path, ObjectType* type, bool* has_next) {
2239 // Get all contents of the next URI recursively till the bottom,
2240 // if the front of the list has not been expanded
2241 if (obj_iter->expanded_.front() == false) {
2242 uint64_t obj_num;
2243 do {
2244 obj_num = obj_iter->objs_.size();
2245 std::vector<URI> uris;
2246 RETURN_NOT_OK(vfs_->ls(obj_iter->objs_.front(), &uris));
2247 obj_iter->expanded_.front() = true;
2248
2249 // Push the new TileDB objects in the front of the iterator's list
2250 ObjectType obj_type;
2251 for (auto it = uris.rbegin(); it != uris.rend(); ++it) {
2252 RETURN_NOT_OK(object_type(*it, &obj_type));
2253 if (obj_type != ObjectType::INVALID) {
2254 obj_iter->objs_.push_front(*it);
2255 obj_iter->expanded_.push_front(false);
2256 }
2257 }
2258 } while (obj_num != obj_iter->objs_.size());
2259 }
2260
2261 // Prepare the values to be returned
2262 URI front_uri = obj_iter->objs_.front();
2263 obj_iter->next_ = front_uri.to_string();
2264 RETURN_NOT_OK(object_type(front_uri, type));
2265 *path = obj_iter->next_.c_str();
2266 *has_next = true;
2267
2268 // Pop the front (next URI) of the iterator's object list
2269 obj_iter->objs_.pop_front();
2270 obj_iter->expanded_.pop_front();
2271
2272 return Status::Ok();
2273 }
2274
object_iter_next_preorder(ObjectIter * obj_iter,const char ** path,ObjectType * type,bool * has_next)2275 Status StorageManager::object_iter_next_preorder(
2276 ObjectIter* obj_iter, const char** path, ObjectType* type, bool* has_next) {
2277 // Prepare the values to be returned
2278 URI front_uri = obj_iter->objs_.front();
2279 obj_iter->next_ = front_uri.to_string();
2280 RETURN_NOT_OK(object_type(front_uri, type));
2281 *path = obj_iter->next_.c_str();
2282 *has_next = true;
2283
2284 // Pop the front (next URI) of the iterator's object list
2285 obj_iter->objs_.pop_front();
2286
2287 // Return if no recursion is needed
2288 if (!obj_iter->recursive_)
2289 return Status::Ok();
2290
2291 // Get all contents of the next URI
2292 std::vector<URI> uris;
2293 RETURN_NOT_OK(vfs_->ls(front_uri, &uris));
2294
2295 // Push the new TileDB objects in the front of the iterator's list
2296 ObjectType obj_type;
2297 for (auto it = uris.rbegin(); it != uris.rend(); ++it) {
2298 RETURN_NOT_OK(object_type(*it, &obj_type));
2299 if (obj_type != ObjectType::INVALID)
2300 obj_iter->objs_.push_front(*it);
2301 }
2302
2303 return Status::Ok();
2304 }
2305
query_submit(Query * query)2306 Status StorageManager::query_submit(Query* query) {
2307 // Process the query
2308 QueryInProgress in_progress(this);
2309 auto st = query->process();
2310
2311 return st;
2312 }
2313
query_submit_async(Query * query)2314 Status StorageManager::query_submit_async(Query* query) {
2315 // Push the query into the async queue
2316 return async_push_query(query);
2317 }
2318
read_from_cache(const URI & uri,uint64_t offset,Buffer * buffer,uint64_t nbytes,bool * in_cache) const2319 Status StorageManager::read_from_cache(
2320 const URI& uri,
2321 uint64_t offset,
2322 Buffer* buffer,
2323 uint64_t nbytes,
2324 bool* in_cache) const {
2325 std::stringstream key;
2326 key << uri.to_string() << "+" << offset;
2327 RETURN_NOT_OK(tile_cache_->read(key.str(), buffer, 0, nbytes, in_cache));
2328 buffer->set_size(nbytes);
2329 buffer->reset_offset();
2330
2331 return Status::Ok();
2332 }
2333
read(const URI & uri,uint64_t offset,Buffer * buffer,uint64_t nbytes) const2334 Status StorageManager::read(
2335 const URI& uri, uint64_t offset, Buffer* buffer, uint64_t nbytes) const {
2336 RETURN_NOT_OK(buffer->realloc(nbytes));
2337 RETURN_NOT_OK(vfs_->read(uri, offset, buffer->data(), nbytes));
2338 buffer->set_size(nbytes);
2339 buffer->reset_offset();
2340
2341 return Status::Ok();
2342 }
2343
read(const URI & uri,uint64_t offset,void * buffer,uint64_t nbytes) const2344 Status StorageManager::read(
2345 const URI& uri, uint64_t offset, void* buffer, uint64_t nbytes) const {
2346 RETURN_NOT_OK(vfs_->read(uri, offset, buffer, nbytes));
2347 return Status::Ok();
2348 }
2349
set_tag(const std::string & key,const std::string & value)2350 Status StorageManager::set_tag(
2351 const std::string& key, const std::string& value) {
2352 tags_[key] = value;
2353
2354 // Tags are added to REST requests as HTTP headers.
2355 if (rest_client_ != nullptr)
2356 RETURN_NOT_OK(rest_client_->set_header(key, value));
2357
2358 return Status::Ok();
2359 }
2360
store_array_schema(ArraySchema * array_schema,const EncryptionKey & encryption_key)2361 Status StorageManager::store_array_schema(
2362 ArraySchema* array_schema, const EncryptionKey& encryption_key) {
2363 const URI schema_uri = array_schema->uri();
2364
2365 // Serialize
2366 Buffer buff;
2367 RETURN_NOT_OK(array_schema->serialize(&buff));
2368
2369 stats_->add_counter("write_array_schema_size", buff.size());
2370
2371 // Delete file if it exists already
2372 bool exists;
2373 RETURN_NOT_OK(is_file(schema_uri, &exists));
2374 if (exists)
2375 RETURN_NOT_OK(vfs_->remove_file(schema_uri));
2376
2377 // Check if the array schema directory exists
2378 // If not create it, this is caused by a pre-v10 array
2379 bool schema_dir_exists = false;
2380 URI array_schema_folder_uri =
2381 array_schema->array_uri().join_path(constants::array_schema_folder_name);
2382 RETURN_NOT_OK(is_dir(array_schema_folder_uri, &schema_dir_exists));
2383 if (!schema_dir_exists)
2384 RETURN_NOT_OK(create_dir(array_schema_folder_uri));
2385
2386 // Write to file
2387 Tile tile(
2388 constants::generic_tile_datatype,
2389 constants::generic_tile_cell_size,
2390 0,
2391 &buff,
2392 false);
2393
2394 GenericTileIO tile_io(this, schema_uri);
2395 uint64_t nbytes;
2396 Status st = tile_io.write_generic(&tile, encryption_key, &nbytes);
2397 (void)nbytes;
2398 if (st.ok())
2399 st = close_file(schema_uri);
2400
2401 buff.clear();
2402
2403 return st;
2404 }
2405
store_array_metadata(const URI & array_uri,const EncryptionKey & encryption_key,Metadata * array_metadata)2406 Status StorageManager::store_array_metadata(
2407 const URI& array_uri,
2408 const EncryptionKey& encryption_key,
2409 Metadata* array_metadata) {
2410 auto timer_se = stats_->start_timer("write_array_meta");
2411
2412 // Trivial case
2413 if (array_metadata == nullptr)
2414 return Status::Ok();
2415
2416 // Serialize array metadata
2417 Buffer metadata_buff;
2418 RETURN_NOT_OK(array_metadata->serialize(&metadata_buff));
2419
2420 // Do nothing if there are no metadata to write
2421 if (metadata_buff.size() == 0)
2422 return Status::Ok();
2423
2424 stats_->add_counter("write_array_meta_size", metadata_buff.size());
2425
2426 // Create a metadata file name
2427 URI array_metadata_uri;
2428 RETURN_NOT_OK(array_metadata->get_uri(array_uri, &array_metadata_uri));
2429
2430 Tile tile(
2431 constants::generic_tile_datatype,
2432 constants::generic_tile_cell_size,
2433 0,
2434 &metadata_buff,
2435 false);
2436
2437 GenericTileIO tile_io(this, array_metadata_uri);
2438 uint64_t nbytes;
2439 Status st = tile_io.write_generic(&tile, encryption_key, &nbytes);
2440 (void)nbytes;
2441
2442 if (st.ok()) {
2443 st = close_file(array_metadata_uri);
2444 }
2445
2446 metadata_buff.clear();
2447
2448 return st;
2449 }
2450
close_file(const URI & uri)2451 Status StorageManager::close_file(const URI& uri) {
2452 return vfs_->close_file(uri);
2453 }
2454
sync(const URI & uri)2455 Status StorageManager::sync(const URI& uri) {
2456 return vfs_->sync(uri);
2457 }
2458
vfs() const2459 VFS* StorageManager::vfs() const {
2460 return vfs_;
2461 }
2462
wait_for_zero_in_progress()2463 void StorageManager::wait_for_zero_in_progress() {
2464 std::unique_lock<std::mutex> lck(queries_in_progress_mtx_);
2465 queries_in_progress_cv_.wait(
2466 lck, [this]() { return queries_in_progress_ == 0; });
2467 }
2468
init_rest_client()2469 Status StorageManager::init_rest_client() {
2470 const char* server_address;
2471 RETURN_NOT_OK(config_.get("rest.server_address", &server_address));
2472 if (server_address != nullptr) {
2473 rest_client_.reset(tdb_new(RestClient));
2474 RETURN_NOT_OK(rest_client_->init(stats_, &config_, compute_tp_));
2475 }
2476
2477 return Status::Ok();
2478 }
2479
write_to_cache(const URI & uri,uint64_t offset,Buffer * buffer) const2480 Status StorageManager::write_to_cache(
2481 const URI& uri, uint64_t offset, Buffer* buffer) const {
2482 // Do not write metadata or array schema to cache
2483 std::string filename = uri.last_path_part();
2484 std::string uri_str = uri.to_string();
2485 if (filename == constants::fragment_metadata_filename ||
2486 (uri_str.find(constants::array_schema_folder_name) !=
2487 std::string::npos) ||
2488 filename == constants::array_schema_filename) {
2489 return Status::Ok();
2490 }
2491
2492 // Generate key (uri + offset)
2493 std::stringstream key;
2494 key << uri.to_string() << "+" << offset;
2495
2496 // Insert to cache
2497 Buffer cached_buffer;
2498 RETURN_NOT_OK(cached_buffer.write(buffer->data(), buffer->size()));
2499 RETURN_NOT_OK(
2500 tile_cache_->insert(key.str(), std::move(cached_buffer), false));
2501
2502 return Status::Ok();
2503 }
2504
write(const URI & uri,Buffer * buffer) const2505 Status StorageManager::write(const URI& uri, Buffer* buffer) const {
2506 return vfs_->write(uri, buffer->data(), buffer->size());
2507 }
2508
write(const URI & uri,void * data,uint64_t size) const2509 Status StorageManager::write(const URI& uri, void* data, uint64_t size) const {
2510 return vfs_->write(uri, data, size);
2511 }
2512
stats()2513 stats::Stats* StorageManager::stats() {
2514 return stats_;
2515 }
2516
logger() const2517 tdb_shared_ptr<Logger> StorageManager::logger() const {
2518 return logger_;
2519 }
2520
2521 /* ****************************** */
2522 /* PRIVATE METHODS */
2523 /* ****************************** */
2524
array_open_without_fragments(const URI & array_uri,const EncryptionKey & encryption_key,OpenArray ** open_array)2525 Status StorageManager::array_open_without_fragments(
2526 const URI& array_uri,
2527 const EncryptionKey& encryption_key,
2528 OpenArray** open_array) {
2529 auto timer_se = stats_->start_timer("read_array_open_without_fragments");
2530 if (!vfs_->supports_uri_scheme(array_uri))
2531 return logger_->status(
2532 Status::StorageManagerError("Cannot open array; "
2533 "URI scheme "
2534 "unsupported."));
2535
2536 // Lock mutexes
2537 {
2538 std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
2539 std::lock_guard<std::mutex> xlock{xlock_mtx_};
2540
2541 // Find the open array entry and check encryption key
2542 auto it = open_arrays_for_reads_.find(array_uri.to_string());
2543 if (it != open_arrays_for_reads_.end()) {
2544 RETURN_NOT_OK(it->second->set_encryption_key(encryption_key));
2545 *open_array = it->second;
2546 } else { // Create a new entry
2547 *open_array = tdb_new(OpenArray, array_uri, QueryType::READ);
2548 RETURN_NOT_OK_ELSE(
2549 (*open_array)->set_encryption_key(encryption_key),
2550 tdb_delete(*open_array));
2551 open_arrays_for_reads_[array_uri.to_string()] = *open_array;
2552 }
2553 // Lock the array and increment counter
2554 (*open_array)->mtx_lock();
2555 (*open_array)->cnt_incr();
2556 }
2557
2558 // Acquire a shared filelock
2559 auto st = (*open_array)->file_lock(vfs_);
2560 if (!st.ok()) {
2561 (*open_array)->mtx_unlock();
2562 array_close_for_reads(array_uri);
2563 return st;
2564 }
2565
2566 // When opening the array we want to always reload the schema
2567 // Fragments are always relisted, now with schema evolution we
2568 // need to make sure we list out any new schemas too.
2569 // Fragment listing is significantly slower than listing schemas
2570 // and they happen in parallel so this is low impact
2571 st = load_array_schema(array_uri, *open_array, encryption_key);
2572 if (!st.ok()) {
2573 (*open_array)->mtx_unlock();
2574 array_close_for_reads(array_uri);
2575 return st;
2576 }
2577
2578 return Status::Ok();
2579 }
2580
get_array_metadata_uris(const URI & array_uri,std::vector<URI> * array_metadata_uris) const2581 Status StorageManager::get_array_metadata_uris(
2582 const URI& array_uri, std::vector<URI>* array_metadata_uris) const {
2583 // Get all uris in the array metadata directory
2584 URI metadata_dir = array_uri.join_path(constants::array_metadata_folder_name);
2585 std::vector<URI> uris;
2586
2587 bool is_dir;
2588 RETURN_NOT_OK(vfs_->is_dir(metadata_dir, &is_dir));
2589 if (!is_dir)
2590 return Status::Ok();
2591
2592 RETURN_NOT_OK(vfs_->ls(metadata_dir.add_trailing_slash(), &uris));
2593
2594 // Get only the metadata uris
2595 for (auto& uri : uris) {
2596 auto uri_last_path = uri.last_path_part();
2597 if (utils::parse::starts_with(uri_last_path, "."))
2598 continue;
2599
2600 if (utils::parse::starts_with(uri_last_path, "__") &&
2601 !utils::parse::ends_with(uri_last_path, constants::vacuum_file_suffix))
2602 array_metadata_uris->push_back(uri);
2603 }
2604
2605 return Status::Ok();
2606 }
2607
load_array_schema(const URI & array_uri,OpenArray * open_array,const EncryptionKey & encryption_key)2608 Status StorageManager::load_array_schema(
2609 const URI& array_uri,
2610 OpenArray* open_array,
2611 const EncryptionKey& encryption_key) {
2612 // Do nothing if the array schema is already loaded
2613 if (open_array->array_schema_latest() != nullptr)
2614 return Status::Ok();
2615
2616 auto array_schema = (ArraySchema*)nullptr;
2617 RETURN_NOT_OK(load_array_schema(array_uri, encryption_key, &array_schema));
2618 open_array->set_array_schema(array_schema);
2619
2620 auto&& [st_schemas, schemas] =
2621 load_all_array_schemas(array_uri, encryption_key);
2622
2623 if (!st_schemas.ok())
2624 return st_schemas;
2625
2626 open_array->set_array_schemas_all(schemas.value());
2627
2628 return Status::Ok();
2629 }
2630
load_array_metadata(OpenArray * open_array,const EncryptionKey & encryption_key,const std::vector<TimestampedURI> & array_metadata_to_load,Metadata * metadata)2631 Status StorageManager::load_array_metadata(
2632 OpenArray* open_array,
2633 const EncryptionKey& encryption_key,
2634 const std::vector<TimestampedURI>& array_metadata_to_load,
2635 Metadata* metadata) {
2636 // Special case
2637 if (metadata == nullptr)
2638 return Status::Ok();
2639
2640 auto metadata_num = array_metadata_to_load.size();
2641 std::vector<tdb_shared_ptr<Buffer>> metadata_buffs;
2642 metadata_buffs.resize(metadata_num);
2643 auto status = parallel_for(compute_tp_, 0, metadata_num, [&](size_t m) {
2644 const auto& uri = array_metadata_to_load[m].uri_;
2645 auto metadata_buff = open_array->array_metadata(uri);
2646 if (metadata_buff == nullptr) { // Array metadata does not exist - load it
2647 GenericTileIO tile_io(this, uri);
2648 auto tile = (Tile*)nullptr;
2649 RETURN_NOT_OK(tile_io.read_generic(&tile, 0, encryption_key, config_));
2650
2651 auto buffer = tile->buffer();
2652 metadata_buff = tdb_make_shared(Buffer);
2653 RETURN_NOT_OK(metadata_buff->realloc(buffer->size()));
2654 metadata_buff->set_size(buffer->size());
2655 buffer->reset_offset();
2656 RETURN_NOT_OK_ELSE(
2657 buffer->read(metadata_buff->data(), metadata_buff->size()),
2658 tdb_delete(tile));
2659 tdb_delete(tile);
2660
2661 open_array->insert_array_metadata(uri, metadata_buff);
2662 }
2663 metadata_buffs[m] = metadata_buff;
2664 return Status::Ok();
2665 });
2666 RETURN_NOT_OK(status);
2667
2668 // Compute array metadata size for the statistics
2669 uint64_t meta_size = 0;
2670 for (const auto& b : metadata_buffs)
2671 meta_size += b->size();
2672 stats_->add_counter("read_array_meta_size", meta_size);
2673
2674 // Deserialize metadata buffers
2675 metadata->deserialize(metadata_buffs);
2676
2677 // Sets the loaded metadata URIs
2678 metadata->set_loaded_metadata_uris(array_metadata_to_load);
2679
2680 return Status::Ok();
2681 }
2682
load_fragment_metadata(OpenArray * open_array,const EncryptionKey & encryption_key,const std::vector<TimestampedURI> & fragments_to_load,Buffer * meta_buff,const std::unordered_map<std::string,uint64_t> & offsets,std::vector<tdb_shared_ptr<FragmentMetadata>> * fragment_metadata)2683 Status StorageManager::load_fragment_metadata(
2684 OpenArray* open_array,
2685 const EncryptionKey& encryption_key,
2686 const std::vector<TimestampedURI>& fragments_to_load,
2687 Buffer* meta_buff,
2688 const std::unordered_map<std::string, uint64_t>& offsets,
2689 std::vector<tdb_shared_ptr<FragmentMetadata>>* fragment_metadata) {
2690 auto timer_se = stats_->start_timer("read_load_frag_meta");
2691
2692 // Load the metadata for each fragment, only if they are not already
2693 // loaded
2694 auto fragment_num = fragments_to_load.size();
2695 fragment_metadata->resize(fragment_num);
2696 auto status = parallel_for(compute_tp_, 0, fragment_num, [&](size_t f) {
2697 const auto& sf = fragments_to_load[f];
2698 auto array_schema = open_array->array_schema_latest();
2699
2700 auto metadata = open_array->fragment_metadata(sf.uri_);
2701 if (metadata == nullptr) { // Fragment metadata does not exist - load it
2702 URI coords_uri =
2703 sf.uri_.join_path(constants::coords + constants::file_suffix);
2704
2705 auto name = sf.uri_.remove_trailing_slash().last_path_part();
2706 uint32_t f_version;
2707 RETURN_NOT_OK(utils::parse::get_fragment_name_version(name, &f_version));
2708
2709 // Note that the fragment metadata version is >= the array schema
2710 // version. Therefore, the check below is defensive and will always
2711 // ensure backwards compatibility.
2712 if (f_version == 1) { // This is equivalent to format version <=2
2713 bool sparse;
2714 RETURN_NOT_OK(vfs_->is_file(coords_uri, &sparse));
2715 metadata = tdb_make_shared(
2716 FragmentMetadata,
2717 this,
2718 array_schema,
2719 sf.uri_,
2720 sf.timestamp_range_,
2721 !sparse);
2722 } else { // Format version > 2
2723 metadata = tdb_make_shared(
2724 FragmentMetadata, this, array_schema, sf.uri_, sf.timestamp_range_);
2725 }
2726
2727 // Potentially find the basic fragment metadata in the consolidated
2728 // metadata buffer
2729 Buffer* f_buff = nullptr;
2730 uint64_t offset = 0;
2731
2732 auto it = offsets.end();
2733 if (metadata->format_version() >= 9) {
2734 it = offsets.find(name);
2735 } else {
2736 it = offsets.find(sf.uri_.to_string());
2737 }
2738 if (it != offsets.end()) {
2739 f_buff = meta_buff;
2740 offset = it->second;
2741 }
2742
2743 // Load fragment metadata
2744 RETURN_NOT_OK(metadata->load(
2745 encryption_key, f_buff, offset, open_array->array_schemas_all()));
2746 open_array->insert_fragment_metadata(metadata);
2747 }
2748
2749 (*fragment_metadata)[f] = metadata;
2750 return Status::Ok();
2751 });
2752 RETURN_NOT_OK(status);
2753
2754 return Status::Ok();
2755 }
2756
load_consolidated_fragment_meta(const URI & uri,const EncryptionKey & enc_key,Buffer * f_buff,std::unordered_map<std::string,uint64_t> * offsets)2757 Status StorageManager::load_consolidated_fragment_meta(
2758 const URI& uri,
2759 const EncryptionKey& enc_key,
2760 Buffer* f_buff,
2761 std::unordered_map<std::string, uint64_t>* offsets) {
2762 auto timer_se = stats_->start_timer("read_load_consolidated_frag_meta");
2763
2764 // No consolidated fragment metadata file
2765 if (uri.to_string().empty())
2766 return Status::Ok();
2767
2768 GenericTileIO tile_io(this, uri);
2769 Tile* tile = nullptr;
2770 RETURN_NOT_OK(tile_io.read_generic(&tile, 0, enc_key, config_));
2771
2772 auto buffer = tile->buffer();
2773 f_buff->realloc(buffer->size());
2774 f_buff->set_size(buffer->size());
2775 buffer->reset_offset();
2776 RETURN_NOT_OK_ELSE(
2777 buffer->read(f_buff->data(), f_buff->size()), tdb_delete(tile));
2778 tdb_delete(tile);
2779
2780 stats_->add_counter("consolidated_frag_meta_size", f_buff->size());
2781
2782 uint32_t fragment_num;
2783 f_buff->reset_offset();
2784 f_buff->read(&fragment_num, sizeof(uint32_t));
2785
2786 uint64_t name_size, offset;
2787 std::string name;
2788 for (uint32_t f = 0; f < fragment_num; ++f) {
2789 f_buff->read(&name_size, sizeof(uint64_t));
2790 name.resize(name_size);
2791 f_buff->read(&name[0], name_size);
2792 f_buff->read(&offset, sizeof(uint64_t));
2793 (*offsets)[name] = offset;
2794 }
2795
2796 return Status::Ok();
2797 }
2798
get_consolidated_fragment_meta_uri(const std::vector<URI> & uris,URI * meta_uri) const2799 Status StorageManager::get_consolidated_fragment_meta_uri(
2800 const std::vector<URI>& uris, URI* meta_uri) const {
2801 uint64_t t_latest = 0;
2802 std::pair<uint64_t, uint64_t> timestamp_range;
2803 for (const auto& uri : uris) {
2804 if (utils::parse::ends_with(uri.to_string(), constants::meta_file_suffix)) {
2805 RETURN_NOT_OK(utils::parse::get_timestamp_range(uri, ×tamp_range));
2806 if (timestamp_range.second > t_latest) {
2807 t_latest = timestamp_range.second;
2808 *meta_uri = uri;
2809 }
2810 }
2811 }
2812
2813 return Status::Ok();
2814 }
2815
get_sorted_uris(const std::vector<URI> & uris,std::vector<TimestampedURI> * sorted_uris,uint64_t timestamp_start,uint64_t timestamp_end) const2816 Status StorageManager::get_sorted_uris(
2817 const std::vector<URI>& uris,
2818 std::vector<TimestampedURI>* sorted_uris,
2819 uint64_t timestamp_start,
2820 uint64_t timestamp_end) const {
2821 // Do nothing if there are not enough URIs
2822 if (uris.empty())
2823 return Status::Ok();
2824
2825 // Get the URIs that must be ignored
2826 std::vector<URI> vac_uris, to_ignore;
2827 RETURN_NOT_OK(get_uris_to_vacuum(
2828 uris, timestamp_start, timestamp_end, &to_ignore, &vac_uris, false));
2829 std::set<URI> to_ignore_set;
2830 for (const auto& uri : to_ignore)
2831 to_ignore_set.emplace(uri);
2832
2833 // Filter based on vacuumed URIs and timestamp
2834 for (auto& uri : uris) {
2835 // Ignore vacuumed URIs
2836 if (to_ignore_set.find(uri) != to_ignore_set.end())
2837 continue;
2838
2839 // Also ignore any vac uris
2840 if (this->is_vacuum_file(uri))
2841 continue;
2842
2843 // Add only URIs whose first timestamp is greater than or equal to the
2844 // timestamp_start and whose second timestamp is smaller than or equal to
2845 // the timestamp_end
2846 std::pair<uint64_t, uint64_t> timestamp_range;
2847 RETURN_NOT_OK(utils::parse::get_timestamp_range(uri, ×tamp_range));
2848 auto t1 = timestamp_range.first;
2849 auto t2 = timestamp_range.second;
2850 if (t1 >= timestamp_start && t2 <= timestamp_end)
2851 sorted_uris->emplace_back(uri, timestamp_range);
2852 }
2853
2854 // Sort the names based on the timestamps
2855 std::sort(sorted_uris->begin(), sorted_uris->end());
2856
2857 return Status::Ok();
2858 }
2859
get_uris_to_vacuum(const std::vector<URI> & uris,uint64_t timestamp_start,uint64_t timestamp_end,std::vector<URI> * to_vacuum,std::vector<URI> * vac_uris,bool allow_partial) const2860 Status StorageManager::get_uris_to_vacuum(
2861 const std::vector<URI>& uris,
2862 uint64_t timestamp_start,
2863 uint64_t timestamp_end,
2864 std::vector<URI>* to_vacuum,
2865 std::vector<URI>* vac_uris,
2866 bool allow_partial) const {
2867 // Get vacuum URIs
2868 std::vector<URI> vac_files;
2869 std::unordered_set<std::string> non_vac_uris_set;
2870 std::unordered_map<std::string, size_t> uris_map;
2871 for (size_t i = 0; i < uris.size(); ++i) {
2872 std::pair<uint64_t, uint64_t> timestamp_range;
2873 RETURN_NOT_OK(utils::parse::get_timestamp_range(uris[i], ×tamp_range));
2874
2875 if (this->is_vacuum_file(uris[i])) {
2876 if (allow_partial) {
2877 if (timestamp_range.first <= timestamp_end &&
2878 timestamp_range.second >= timestamp_start)
2879 vac_files.emplace_back(uris[i]);
2880 } else {
2881 if (timestamp_range.first >= timestamp_start &&
2882 timestamp_range.second <= timestamp_end)
2883 vac_files.emplace_back(uris[i]);
2884 }
2885 } else {
2886 if (timestamp_range.first < timestamp_start ||
2887 timestamp_range.second > timestamp_end) {
2888 non_vac_uris_set.emplace(uris[i].to_string());
2889 } else {
2890 uris_map[uris[i].to_string()] = i;
2891 }
2892 }
2893 }
2894
2895 // Compute fragment URIs to vacuum as a bitmap vector
2896 // Also determine which vac files to vacuum
2897 std::vector<int32_t> to_vacuum_vec(uris.size(), 0);
2898 std::vector<int32_t> to_vacuum_vac_files_vec(vac_files.size(), 0);
2899 auto status =
2900 parallel_for(compute_tp_, 0, vac_files.size(), [&, this](size_t i) {
2901 uint64_t size = 0;
2902 RETURN_NOT_OK(vfs_->file_size(vac_files[i], &size));
2903 std::string names;
2904 names.resize(size);
2905 RETURN_NOT_OK(vfs_->read(vac_files[i], 0, &names[0], size));
2906 std::stringstream ss(names);
2907 bool vacuum_vac_file = true;
2908 for (std::string uri_str; std::getline(ss, uri_str);) {
2909 auto it = uris_map.find(uri_str);
2910 if (it != uris_map.end())
2911 to_vacuum_vec[it->second] = 1;
2912
2913 if (vacuum_vac_file &&
2914 non_vac_uris_set.find(uri_str) != non_vac_uris_set.end()) {
2915 vacuum_vac_file = false;
2916 }
2917 }
2918
2919 to_vacuum_vac_files_vec[i] = vacuum_vac_file;
2920
2921 return Status::Ok();
2922 });
2923 RETURN_NOT_OK(status);
2924
2925 // Compute the URIs to vacuum
2926 to_vacuum->clear();
2927 for (size_t i = 0; i < uris.size(); ++i) {
2928 if (to_vacuum_vec[i] == 1)
2929 to_vacuum->emplace_back(uris[i]);
2930 }
2931
2932 // Compute the vac URIs to vacuum
2933 vac_uris->clear();
2934 for (size_t i = 0; i < vac_files.size(); ++i) {
2935 if (to_vacuum_vac_files_vec[i] == 1)
2936 vac_uris->emplace_back(vac_files[i]);
2937 }
2938
2939 return Status::Ok();
2940 }
2941
set_default_tags()2942 Status StorageManager::set_default_tags() {
2943 const auto version = std::to_string(constants::library_version[0]) + "." +
2944 std::to_string(constants::library_version[1]) + "." +
2945 std::to_string(constants::library_version[2]);
2946
2947 RETURN_NOT_OK(set_tag("x-tiledb-version", version));
2948 RETURN_NOT_OK(set_tag("x-tiledb-api-language", "c"));
2949
2950 return Status::Ok();
2951 }
2952
2953 } // namespace sm
2954 } // namespace tiledb
2955