1 /**
2  * @file   storage_manager.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  * @copyright Copyright (c) 2016 MIT and Intel Corporation
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27  * THE SOFTWARE.
28  *
29  * @section DESCRIPTION
30  *
31  * This file implements the StorageManager class.
32  */
33 
34 #include <algorithm>
35 #include <functional>
36 #include <iostream>
37 #include <sstream>
38 
39 #include "tiledb/common/heap_memory.h"
40 #include "tiledb/common/logger.h"
41 #include "tiledb/sm/array/array.h"
42 #include "tiledb/sm/array_schema/array_schema.h"
43 #include "tiledb/sm/array_schema/array_schema_evolution.h"
44 #include "tiledb/sm/cache/buffer_lru_cache.h"
45 #include "tiledb/sm/enums/array_type.h"
46 #include "tiledb/sm/enums/layout.h"
47 #include "tiledb/sm/enums/object_type.h"
48 #include "tiledb/sm/enums/query_type.h"
49 #include "tiledb/sm/filesystem/vfs.h"
50 #include "tiledb/sm/fragment/fragment_info.h"
51 #include "tiledb/sm/global_state/global_state.h"
52 #include "tiledb/sm/global_state/unit_test_config.h"
53 #include "tiledb/sm/misc/parallel_functions.h"
54 #include "tiledb/sm/misc/utils.h"
55 #include "tiledb/sm/misc/uuid.h"
56 #include "tiledb/sm/query/query.h"
57 #include "tiledb/sm/rest/rest_client.h"
58 #include "tiledb/sm/stats/global_stats.h"
59 #include "tiledb/sm/storage_manager/consolidator.h"
60 #include "tiledb/sm/storage_manager/open_array.h"
61 #include "tiledb/sm/storage_manager/storage_manager.h"
62 #include "tiledb/sm/tile/generic_tile_io.h"
63 #include "tiledb/sm/tile/tile.h"
64 
65 #include <algorithm>
66 #include <iostream>
67 #include <sstream>
68 
69 using namespace tiledb::common;
70 
71 namespace tiledb {
72 namespace sm {
73 
74 /* ****************************** */
75 /*   CONSTRUCTORS & DESTRUCTORS   */
76 /* ****************************** */
77 
StorageManager(ThreadPool * const compute_tp,ThreadPool * const io_tp,stats::Stats * const parent_stats,tdb_shared_ptr<Logger> logger)78 StorageManager::StorageManager(
79     ThreadPool* const compute_tp,
80     ThreadPool* const io_tp,
81     stats::Stats* const parent_stats,
82     tdb_shared_ptr<Logger> logger)
83     : stats_(parent_stats->create_child("StorageManager"))
84     , logger_(logger)
85     , cancellation_in_progress_(false)
86     , queries_in_progress_(0)
87     , compute_tp_(compute_tp)
88     , io_tp_(io_tp)
89     , vfs_(nullptr) {
90 }
91 
~StorageManager()92 StorageManager::~StorageManager() {
93   global_state::GlobalState::GetGlobalState().unregister_storage_manager(this);
94 
95   if (vfs_ != nullptr)
96     cancel_all_tasks();
97 
98   // Release all filelocks and delete all opened arrays for reads
99   for (auto& open_array_it : open_arrays_for_reads_) {
100     open_array_it.second->file_unlock(vfs_);
101     tdb_delete(open_array_it.second);
102   }
103 
104   // Delete all opened arrays for writes
105   for (auto& open_array_it : open_arrays_for_writes_)
106     tdb_delete(open_array_it.second);
107 
108   for (auto& fl_it : xfilelocks_) {
109     auto filelock = fl_it.second;
110     auto lock_uri = URI(fl_it.first).join_path(constants::filelock_name);
111     if (filelock != INVALID_FILELOCK)
112       vfs_->filelock_unlock(lock_uri);
113   }
114 
115   if (vfs_ != nullptr) {
116     const Status st = vfs_->terminate();
117     if (!st.ok()) {
118       logger_->status(Status::StorageManagerError("Failed to terminate VFS."));
119     }
120 
121     tdb_delete(vfs_);
122   }
123 }
124 
125 /* ****************************** */
126 /*               API              */
127 /* ****************************** */
128 
array_close_for_reads(const URI & array_uri)129 Status StorageManager::array_close_for_reads(const URI& array_uri) {
130   // Lock mutex
131   std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
132 
133   // Find the open array entry
134   auto it = open_arrays_for_reads_.find(array_uri.to_string());
135 
136   // Do nothing if array is closed
137   if (it == open_arrays_for_reads_.end()) {
138     return Status::Ok();
139   }
140 
141   // For easy reference
142   OpenArray* open_array = it->second;
143 
144   // Lock the mutex of the array and decrement counter
145   open_array->mtx_lock();
146   open_array->cnt_decr();
147 
148   // Close the array if the counter reaches 0
149   if (open_array->cnt() == 0) {
150     // Release file lock
151     auto st = open_array->file_unlock(vfs_);
152     if (!st.ok()) {
153       open_array->mtx_unlock();
154       return st;
155     }
156     // Remove open array entry
157     open_array->mtx_unlock();
158     tdb_delete(open_array);
159     open_arrays_for_reads_.erase(it);
160   } else {  // Just unlock the array mutex
161     open_array->mtx_unlock();
162   }
163 
164   xlock_cv_.notify_all();
165 
166   return Status::Ok();
167 }
168 
array_close_for_writes(const URI & array_uri,const EncryptionKey & encryption_key,Metadata * array_metadata)169 Status StorageManager::array_close_for_writes(
170     const URI& array_uri,
171     const EncryptionKey& encryption_key,
172     Metadata* array_metadata) {
173   // Lock mutex
174   std::lock_guard<std::mutex> lock{open_array_for_writes_mtx_};
175 
176   // Find the open array entry
177   auto it = open_arrays_for_writes_.find(array_uri.to_string());
178 
179   // Do nothing if array is closed
180   if (it == open_arrays_for_writes_.end()) {
181     return Status::Ok();
182   }
183 
184   // For easy reference
185   OpenArray* open_array = it->second;
186 
187   // Flush the array metadata
188   RETURN_NOT_OK(
189       store_array_metadata(array_uri, encryption_key, array_metadata));
190 
191   // Lock the mutex of the array and decrement counter
192   open_array->mtx_lock();
193   open_array->cnt_decr();
194 
195   // Close the array if the counter reaches 0
196   if (open_array->cnt() == 0) {
197     open_array->mtx_unlock();
198     tdb_delete(open_array);
199     open_arrays_for_writes_.erase(it);
200   } else {  // Just unlock the array mutex
201     open_array->mtx_unlock();
202   }
203 
204   return Status::Ok();
205 }
206 
207 std::tuple<
208     Status,
209     std::optional<ArraySchema*>,
210     std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>,
211     std::optional<std::vector<tdb_shared_ptr<FragmentMetadata>>>>
array_open_for_reads(const URI & array_uri,const EncryptionKey & enc_key,uint64_t timestamp_start,uint64_t timestamp_end)212 StorageManager::array_open_for_reads(
213     const URI& array_uri,
214     const EncryptionKey& enc_key,
215     uint64_t timestamp_start,
216     uint64_t timestamp_end) {
217   auto timer_se = stats_->start_timer("read_array_open");
218 
219   /* NOTE: these variables may be modified on a different thread
220            in the load_array_fragments_task below.
221   */
222   std::vector<TimestampedURI> fragments_to_load;
223   std::vector<URI> fragment_uris;
224   URI meta_uri;
225   Buffer f_buff;
226   std::unordered_map<std::string, uint64_t> offsets;
227 
228   // Fetch array fragments async
229   std::vector<ThreadPool::Task> load_array_fragments_task;
230   load_array_fragments_task.emplace_back(io_tp_->execute([array_uri,
231                                                           &enc_key,
232                                                           &f_buff,
233                                                           &fragments_to_load,
234                                                           &fragment_uris,
235                                                           &meta_uri,
236                                                           &offsets,
237                                                           &timestamp_start,
238                                                           &timestamp_end,
239                                                           this]() {
240     // Determine which fragments to load
241     RETURN_NOT_OK(get_fragment_uris(array_uri, &fragment_uris, &meta_uri));
242     RETURN_NOT_OK(get_sorted_uris(
243         fragment_uris, &fragments_to_load, timestamp_start, timestamp_end));
244     // Get the consolidated fragment metadata
245     RETURN_NOT_OK(
246         load_consolidated_fragment_meta(meta_uri, enc_key, &f_buff, &offsets));
247     return Status::Ok();
248   }));
249 
250   // Wait for array fragments to be loaded.
251   Status st = io_tp_->wait_all(load_array_fragments_task);
252 
253   auto open_array = (OpenArray*)nullptr;
254   st = array_open_without_fragments(array_uri, enc_key, &open_array);
255 
256   if (!st.ok()) {
257     io_tp_->wait_all(load_array_fragments_task);
258     return {st, std::nullopt, std::nullopt, std::nullopt};
259   }
260 
261   if (!st.ok()) {
262     open_array->mtx_unlock();
263     array_close_for_reads(array_uri);
264     return {st, std::nullopt, std::nullopt, std::nullopt};
265   }
266 
267   // Get fragment metadata in the case of reads, if not fetched already
268   std::vector<tdb_shared_ptr<FragmentMetadata>> fragment_metadata;
269   st = load_fragment_metadata(
270       open_array,
271       enc_key,
272       fragments_to_load,
273       &f_buff,
274       offsets,
275       &fragment_metadata);
276 
277   if (!st.ok()) {
278     open_array->mtx_unlock();
279     array_close_for_reads(array_uri);
280     return {st, std::nullopt, std::nullopt, std::nullopt};
281   }
282 
283   // Unlock the array mutex
284   open_array->mtx_unlock();
285 
286   // Note that we retain the (shared) lock on the array filelock
287   return {Status::Ok(),
288           open_array->array_schema_latest(),
289           open_array->array_schemas_all(),
290           fragment_metadata};
291 }
292 
293 std::tuple<
294     Status,
295     std::optional<ArraySchema*>,
296     std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>>
array_open_for_reads_without_fragments(const URI & array_uri,const EncryptionKey & enc_key)297 StorageManager::array_open_for_reads_without_fragments(
298     const URI& array_uri, const EncryptionKey& enc_key) {
299   auto timer_se = stats_->start_timer("read_array_open");
300 
301   auto open_array = (OpenArray*)nullptr;
302   Status st = array_open_without_fragments(array_uri, enc_key, &open_array);
303   if (!st.ok()) {
304     return {st, std::nullopt, std::nullopt};
305   }
306 
307   // Unlock the array mutex
308   open_array->mtx_unlock();
309 
310   // Note that we retain the (shared) lock on the array filelock
311   return {Status::Ok(),
312           open_array->array_schema_latest(),
313           open_array->array_schemas_all()};
314 }
315 
316 std::tuple<
317     Status,
318     std::optional<ArraySchema*>,
319     std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>>
array_open_for_writes(const URI & array_uri,const EncryptionKey & encryption_key)320 StorageManager::array_open_for_writes(
321     const URI& array_uri, const EncryptionKey& encryption_key) {
322   if (!vfs_->supports_uri_scheme(array_uri))
323     return {logger_->status(Status::StorageManagerError(
324                 "Cannot open array; URI scheme unsupported.")),
325             std::nullopt,
326             std::nullopt};
327 
328   // Check if array exists
329   ObjectType obj_type;
330   auto st = this->object_type(array_uri, &obj_type);
331   if (!st.ok())
332     return {st, std::nullopt, std::nullopt};
333 
334   if (obj_type != ObjectType::ARRAY) {
335     return {logger_->status(Status::StorageManagerError(
336                 "Cannot open array; Array does not exist")),
337             std::nullopt,
338             std::nullopt};
339   }
340 
341   auto open_array = (OpenArray*)nullptr;
342 
343   // Lock mutex
344   {
345     std::lock_guard<std::mutex> lock{open_array_for_writes_mtx_};
346 
347     // Find the open array entry and check key correctness
348     auto it = open_arrays_for_writes_.find(array_uri.to_string());
349     if (it != open_arrays_for_writes_.end()) {
350       st = it->second->set_encryption_key(encryption_key);
351       if (!st.ok())
352         return {st, std::nullopt, std::nullopt};
353 
354       open_array = it->second;
355     } else {  // Create a new entry
356       open_array = tdb_new(OpenArray, array_uri, QueryType::WRITE);
357       st = open_array->set_encryption_key(encryption_key);
358       if (!st.ok()) {
359         tdb_delete(open_array) return {st, std::nullopt, std::nullopt};
360       }
361 
362       open_arrays_for_writes_[array_uri.to_string()] = open_array;
363     }
364 
365     // Lock the array and increment counter
366     open_array->mtx_lock();
367     open_array->cnt_incr();
368   }
369 
370   // No shared filelock needed to be acquired
371 
372   // When opening the array we want to always reload the schema
373   // Fragments are always relisted, now with schema evolution we
374   // need to make sure we list out any new schemas too.
375   // Fragment listing is significantly slower than listing schemas
376   // and they happen in parallel so this is low impact
377   st = load_array_schema(array_uri, open_array, encryption_key);
378   if (!st.ok()) {
379     open_array->mtx_unlock();
380     array_close_for_writes(array_uri, encryption_key, nullptr);
381     return {st, std::nullopt, std::nullopt};
382   }
383 
384   // This library should not be able to write to newer-versioned arrays
385   // (but it is ok to write to older arrays)
386   if (open_array->array_schema_latest()->version() >
387       constants::format_version) {
388     std::stringstream err;
389     err << "Cannot open array for writes; Array format version (";
390     err << open_array->array_schema_latest()->version();
391     err << ") is newer than library format version (";
392     err << constants::format_version << ")";
393     open_array->mtx_unlock();
394     array_close_for_writes(array_uri, encryption_key, nullptr);
395     return {logger_->status(Status::StorageManagerError(err.str())),
396             std::nullopt,
397             std::nullopt};
398   }
399 
400   // Unlock the array mutex
401   open_array->mtx_unlock();
402 
403   return {Status::Ok(),
404           open_array->array_schema_latest(),
405           open_array->array_schemas_all()};
406 }
407 
array_load_fragments(const URI & array_uri,const EncryptionKey & enc_key,std::vector<tdb_shared_ptr<FragmentMetadata>> * fragment_metadata,const std::vector<TimestampedURI> & fragments_to_load)408 Status StorageManager::array_load_fragments(
409     const URI& array_uri,
410     const EncryptionKey& enc_key,
411     std::vector<tdb_shared_ptr<FragmentMetadata>>* fragment_metadata,
412     const std::vector<TimestampedURI>& fragments_to_load) {
413   auto timer_se = stats_->start_timer("read_array_open");
414 
415   auto open_array = (OpenArray*)nullptr;
416   // Lock mutex
417   {
418     std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
419 
420     // Find the open array entry
421     auto it = open_arrays_for_reads_.find(array_uri.to_string());
422     if (it == open_arrays_for_reads_.end()) {
423       return logger_->status(Status::StorageManagerError(
424           std::string("Cannot reopen array ") + array_uri.to_string() +
425           "; Array not open"));
426     }
427     RETURN_NOT_OK(it->second->set_encryption_key(enc_key));
428     open_array = it->second;
429 
430     // Lock the array
431     open_array->mtx_lock();
432   }
433 
434   std::unordered_map<std::string, uint64_t> offsets;
435 
436   // Get fragment metadata in the case of reads, if not fetched already
437   auto st = load_fragment_metadata(
438       open_array,
439       enc_key,
440       fragments_to_load,
441       nullptr,
442       offsets,
443       fragment_metadata);
444   if (!st.ok()) {
445     open_array->mtx_unlock();
446     array_close_for_reads(array_uri);
447     return st;
448   }
449 
450   // Unlock the mutexes
451   open_array->mtx_unlock();
452 
453   return st;
454 }
455 
456 std::tuple<
457     Status,
458     std::optional<ArraySchema*>,
459     std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>,
460     std::optional<std::vector<tdb_shared_ptr<FragmentMetadata>>>>
array_reopen(const URI & array_uri,const EncryptionKey & enc_key,uint64_t timestamp_start,uint64_t timestamp_end)461 StorageManager::array_reopen(
462     const URI& array_uri,
463     const EncryptionKey& enc_key,
464     uint64_t timestamp_start,
465     uint64_t timestamp_end) {
466   auto timer_se = stats_->start_timer("read_array_open");
467 
468   auto open_array = (OpenArray*)nullptr;
469 
470   // Lock mutex
471   {
472     std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
473 
474     // Find the open array entry
475     auto it = open_arrays_for_reads_.find(array_uri.to_string());
476     if (it == open_arrays_for_reads_.end()) {
477       return {logger_->status(Status::StorageManagerError(
478                   std::string("Cannot reopen array ") + array_uri.to_string() +
479                   "; Array not open")),
480               std::nullopt,
481               std::nullopt,
482               std::nullopt};
483     }
484     auto st = it->second->set_encryption_key(enc_key);
485     if (!st.ok())
486       return {st, std::nullopt, std::nullopt, std::nullopt};
487 
488     open_array = it->second;
489 
490     // Lock the array
491     open_array->mtx_lock();
492   }
493 
494   // Determine which fragments to load
495   std::vector<TimestampedURI> fragments_to_load;
496   std::vector<URI> fragment_uris;
497   URI meta_uri;
498   auto st = get_fragment_uris(array_uri, &fragment_uris, &meta_uri);
499   if (!st.ok())
500     return {st, std::nullopt, std::nullopt, std::nullopt};
501   st = get_sorted_uris(
502       fragment_uris, &fragments_to_load, timestamp_start, timestamp_end);
503   if (!st.ok())
504     return {st, std::nullopt, std::nullopt, std::nullopt};
505 
506   // Get the consolidated fragment metadata
507   Buffer f_buff;
508   std::unordered_map<std::string, uint64_t> offsets;
509   st = load_consolidated_fragment_meta(meta_uri, enc_key, &f_buff, &offsets);
510   if (!st.ok())
511     return {st, std::nullopt, std::nullopt, std::nullopt};
512 
513   // Get fragment metadata in the case of reads, if not fetched already
514   std::vector<tdb_shared_ptr<FragmentMetadata>> fragment_metadata;
515   st = load_fragment_metadata(
516       open_array,
517       enc_key,
518       fragments_to_load,
519       &f_buff,
520       offsets,
521       &fragment_metadata);
522   if (!st.ok()) {
523     open_array->mtx_unlock();
524     array_close_for_reads(array_uri);
525     return {st, std::nullopt, std::nullopt, std::nullopt};
526   }
527 
528   // Unlock the mutexes
529   open_array->mtx_unlock();
530 
531   return {st,
532           open_array->array_schema_latest(),
533           open_array->array_schemas_all(),
534           fragment_metadata};
535 }
536 
array_consolidate(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length,const Config * config)537 Status StorageManager::array_consolidate(
538     const char* array_name,
539     EncryptionType encryption_type,
540     const void* encryption_key,
541     uint32_t key_length,
542     const Config* config) {
543   // Check array URI
544   URI array_uri(array_name);
545   if (array_uri.is_invalid()) {
546     return logger_->status(
547         Status::StorageManagerError("Cannot consolidate array; Invalid URI"));
548   }
549 
550   // Check if array exists
551   ObjectType obj_type;
552   RETURN_NOT_OK(object_type(array_uri, &obj_type));
553 
554   if (obj_type != ObjectType::ARRAY) {
555     return logger_->status(Status::StorageManagerError(
556         "Cannot consolidate array; Array does not exist"));
557   }
558 
559   // If 'config' is unset, use the 'config_' that was set during initialization
560   // of this StorageManager instance.
561   if (!config) {
562     config = &config_;
563   }
564 
565   // Get encryption key from config
566   std::string encryption_key_from_cfg;
567   if (!encryption_key) {
568     bool found = false;
569     encryption_key_from_cfg = config->get("sm.encryption_key", &found);
570     assert(found);
571   }
572 
573   if (!encryption_key_from_cfg.empty()) {
574     encryption_key = encryption_key_from_cfg.c_str();
575     std::string encryption_type_from_cfg;
576     bool found = false;
577     encryption_type_from_cfg = config->get("sm.encryption_type", &found);
578     assert(found);
579     auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
580     RETURN_NOT_OK(st);
581     encryption_type = et.value();
582 
583     if (EncryptionKey::is_valid_key_length(
584             encryption_type,
585             static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
586       const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
587       if (unit_test_cfg.array_encryption_key_length.is_set()) {
588         key_length = unit_test_cfg.array_encryption_key_length.get();
589       } else {
590         key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
591       }
592     } else {
593       encryption_key = nullptr;
594       key_length = 0;
595     }
596   }
597 
598   // Consolidate
599   Consolidator consolidator(this);
600   return consolidator.consolidate(
601       array_name, encryption_type, encryption_key, key_length, config);
602 }
603 
array_vacuum(const char * array_name,const Config * config)604 Status StorageManager::array_vacuum(
605     const char* array_name, const Config* config) {
606   // If 'config' is unset, use the 'config_' that was set during initialization
607   // of this StorageManager instance.
608   if (!config) {
609     config = &config_;
610   }
611 
612   // Get mode
613   const char* mode;
614   RETURN_NOT_OK(config->get("sm.vacuum.mode", &mode));
615 
616   bool found = false;
617   uint64_t timestamp_start;
618   RETURN_NOT_OK(config->get<uint64_t>(
619       "sm.vacuum.timestamp_start", &timestamp_start, &found));
620   assert(found);
621 
622   uint64_t timestamp_end;
623   RETURN_NOT_OK(
624       config->get<uint64_t>("sm.vacuum.timestamp_end", &timestamp_end, &found));
625   assert(found);
626 
627   if (mode == nullptr)
628     return logger_->status(Status::StorageManagerError(
629         "Cannot vacuum array; Vacuum mode cannot be null"));
630   else if (std::string(mode) == "fragments")
631     RETURN_NOT_OK(
632         array_vacuum_fragments(array_name, timestamp_start, timestamp_end));
633   else if (std::string(mode) == "fragment_meta")
634     RETURN_NOT_OK(array_vacuum_fragment_meta(array_name));
635   else if (std::string(mode) == "array_meta")
636     RETURN_NOT_OK(
637         array_vacuum_array_meta(array_name, timestamp_start, timestamp_end));
638   else
639     return logger_->status(Status::StorageManagerError(
640         "Cannot vacuum array; Invalid vacuum mode"));
641 
642   return Status::Ok();
643 }
644 
array_vacuum_fragments(const char * array_name,uint64_t timestamp_start,uint64_t timestamp_end)645 Status StorageManager::array_vacuum_fragments(
646     const char* array_name, uint64_t timestamp_start, uint64_t timestamp_end) {
647   if (array_name == nullptr)
648     return logger_->status(Status::StorageManagerError(
649         "Cannot vacuum fragments; Array name cannot be null"));
650 
651   // Get all URIs in the array directory
652   URI array_uri(array_name);
653   std::vector<URI> uris;
654   RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
655 
656   // Get URIs to be vacuumed
657   std::vector<URI> to_vacuum, vac_uris;
658   RETURN_NOT_OK(get_uris_to_vacuum(
659       uris, timestamp_start, timestamp_end, &to_vacuum, &vac_uris));
660 
661   // Delete the ok files
662   RETURN_NOT_OK(array_xlock(array_uri));
663   auto status =
664       parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
665         auto uri = URI(to_vacuum[i].to_string() + constants::ok_file_suffix);
666         RETURN_NOT_OK(vfs_->remove_file(uri));
667 
668         return Status::Ok();
669       });
670   RETURN_NOT_OK_ELSE(status, array_xunlock(array_uri));
671   RETURN_NOT_OK(array_xunlock(array_uri));
672 
673   // Delete fragment directories
674   status = parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
675     RETURN_NOT_OK(vfs_->remove_dir(to_vacuum[i]));
676 
677     return Status::Ok();
678   });
679   RETURN_NOT_OK(status);
680 
681   // Delete vacuum files
682   status = parallel_for(compute_tp_, 0, vac_uris.size(), [&, this](size_t i) {
683     RETURN_NOT_OK(vfs_->remove_file(vac_uris[i]));
684     return Status::Ok();
685   });
686   RETURN_NOT_OK(status);
687 
688   return Status::Ok();
689 }
690 
array_vacuum_fragment_meta(const char * array_name)691 Status StorageManager::array_vacuum_fragment_meta(const char* array_name) {
692   if (array_name == nullptr)
693     return logger_->status(Status::StorageManagerError(
694         "Cannot vacuum fragment metadata; Array name cannot be null"));
695 
696   // Get the consolidated fragment metadata URIs to be deleted
697   // (all except the last one)
698   URI array_uri(array_name);
699   std::vector<URI> uris, to_vacuum;
700   URI last;
701   RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
702   RETURN_NOT_OK(get_consolidated_fragment_meta_uri(uris, &last));
703   to_vacuum.reserve(uris.size());
704   for (const auto& uri : uris) {
705     if (utils::parse::ends_with(uri.to_string(), constants::meta_file_suffix) &&
706         uri != last)
707       to_vacuum.emplace_back(uri);
708   }
709 
710   // Vacuum after exclusively locking the array
711   RETURN_NOT_OK(array_xlock(array_uri));
712   auto status =
713       parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
714         RETURN_NOT_OK(vfs_->remove_file(to_vacuum[i]));
715         return Status::Ok();
716       });
717   RETURN_NOT_OK_ELSE(status, array_xunlock(array_uri));
718   RETURN_NOT_OK(array_xunlock(array_uri));
719 
720   return Status::Ok();
721 }
722 
array_vacuum_array_meta(const char * array_name,uint64_t timestamp_start,uint64_t timestamp_end)723 Status StorageManager::array_vacuum_array_meta(
724     const char* array_name, uint64_t timestamp_start, uint64_t timestamp_end) {
725   if (array_name == nullptr)
726     return logger_->status(Status::StorageManagerError(
727         "Cannot vacuum array metadata; Array name cannot be null"));
728 
729   // Get all URIs in the array directory
730   URI array_uri(array_name);
731   auto meta_uri = array_uri.join_path(constants::array_metadata_folder_name);
732   std::vector<URI> uris;
733   RETURN_NOT_OK(vfs_->ls(meta_uri.add_trailing_slash(), &uris));
734 
735   // Get URIs to be vacuumed
736   std::vector<URI> to_vacuum, vac_uris;
737   RETURN_NOT_OK(get_uris_to_vacuum(
738       uris, timestamp_start, timestamp_end, &to_vacuum, &vac_uris));
739 
740   // Delete the array metadata files
741   RETURN_NOT_OK(array_xlock(array_uri));
742   auto status =
743       parallel_for(compute_tp_, 0, to_vacuum.size(), [&, this](size_t i) {
744         RETURN_NOT_OK(vfs_->remove_file(to_vacuum[i]));
745 
746         return Status::Ok();
747       });
748   RETURN_NOT_OK_ELSE(status, array_xunlock(array_uri));
749   RETURN_NOT_OK(array_xunlock(array_uri));
750 
751   // Delete vacuum files
752   status = parallel_for(compute_tp_, 0, vac_uris.size(), [&, this](size_t i) {
753     RETURN_NOT_OK(vfs_->remove_file(vac_uris[i]));
754     return Status::Ok();
755   });
756   RETURN_NOT_OK(status);
757 
758   return Status::Ok();
759 }
760 
array_metadata_consolidate(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length,const Config * config)761 Status StorageManager::array_metadata_consolidate(
762     const char* array_name,
763     EncryptionType encryption_type,
764     const void* encryption_key,
765     uint32_t key_length,
766     const Config* config) {
767   // Check array URI
768   URI array_uri(array_name);
769   if (array_uri.is_invalid()) {
770     return logger_->status(Status::StorageManagerError(
771         "Cannot consolidate array metadata; Invalid URI"));
772   }
773   // Check if array exists
774   ObjectType obj_type;
775   RETURN_NOT_OK(object_type(array_uri, &obj_type));
776 
777   if (obj_type != ObjectType::ARRAY) {
778     return logger_->status(Status::StorageManagerError(
779         "Cannot consolidate array metadata; Array does not exist"));
780   }
781 
782   // If 'config' is unset, use the 'config_' that was set during initialization
783   // of this StorageManager instance.
784   if (!config) {
785     config = &config_;
786   }
787 
788   // Get encryption key from config
789   std::string encryption_key_from_cfg;
790   if (!encryption_key) {
791     bool found = false;
792     encryption_key_from_cfg = config->get("sm.encryption_key", &found);
793     assert(found);
794   }
795 
796   if (!encryption_key_from_cfg.empty()) {
797     encryption_key = encryption_key_from_cfg.c_str();
798     std::string encryption_type_from_cfg;
799     bool found = false;
800     encryption_type_from_cfg = config->get("sm.encryption_type", &found);
801     assert(found);
802     auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
803     RETURN_NOT_OK(st);
804     encryption_type = et.value();
805 
806     if (EncryptionKey::is_valid_key_length(
807             encryption_type,
808             static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
809       const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
810       if (unit_test_cfg.array_encryption_key_length.is_set()) {
811         key_length = unit_test_cfg.array_encryption_key_length.get();
812       } else {
813         key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
814       }
815     } else {
816       encryption_key = nullptr;
817       key_length = 0;
818     }
819   }
820 
821   // Consolidate
822   Consolidator consolidator(this);
823   return consolidator.consolidate_array_meta(
824       array_name, encryption_type, encryption_key, key_length);
825 }
826 
array_create(const URI & array_uri,ArraySchema * array_schema,const EncryptionKey & encryption_key)827 Status StorageManager::array_create(
828     const URI& array_uri,
829     ArraySchema* array_schema,
830     const EncryptionKey& encryption_key) {
831   // Check array schema
832   if (array_schema == nullptr) {
833     return logger_->status(
834         Status::StorageManagerError("Cannot create array; Empty array schema"));
835   }
836 
837   // Check if array exists
838   bool exists = false;
839   RETURN_NOT_OK(is_array(array_uri, &exists));
840   if (exists)
841     return logger_->status(Status::StorageManagerError(
842         std::string("Cannot create array; Array '") + array_uri.c_str() +
843         "' already exists"));
844 
845   std::lock_guard<std::mutex> lock{object_create_mtx_};
846   array_schema->set_array_uri(array_uri);
847   RETURN_NOT_OK(array_schema->generate_uri());
848   RETURN_NOT_OK(array_schema->check());
849 
850   // Create array directory
851   RETURN_NOT_OK(vfs_->create_dir(array_uri));
852 
853   // Create array schema directory
854   URI array_schema_folder_uri =
855       array_uri.join_path(constants::array_schema_folder_name);
856   RETURN_NOT_OK(vfs_->create_dir(array_schema_folder_uri));
857 
858   // Create array metadata directory
859   URI array_metadata_uri =
860       array_uri.join_path(constants::array_metadata_folder_name);
861   RETURN_NOT_OK(vfs_->create_dir(array_metadata_uri));
862   Status st;
863 
864   // Get encryption key from config
865   if (encryption_key.encryption_type() == EncryptionType::NO_ENCRYPTION) {
866     bool found = false;
867     std::string encryption_key_from_cfg =
868         config_.get("sm.encryption_key", &found);
869     assert(found);
870     std::string encryption_type_from_cfg =
871         config_.get("sm.encryption_type", &found);
872     assert(found);
873     auto [st, etc] = encryption_type_enum(encryption_type_from_cfg);
874     RETURN_NOT_OK(st);
875     EncryptionType encryption_type_cfg = etc.value();
876 
877     EncryptionKey encryption_key_cfg;
878     if (encryption_key_from_cfg.empty()) {
879       RETURN_NOT_OK(
880           encryption_key_cfg.set_key(encryption_type_cfg, nullptr, 0));
881     } else {
882       uint32_t key_length = 0;
883       if (EncryptionKey::is_valid_key_length(
884               encryption_type_cfg,
885               static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
886         const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
887         if (unit_test_cfg.array_encryption_key_length.is_set()) {
888           key_length = unit_test_cfg.array_encryption_key_length.get();
889         } else {
890           key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
891         }
892       }
893       RETURN_NOT_OK(encryption_key_cfg.set_key(
894           encryption_type_cfg,
895           (const void*)encryption_key_from_cfg.c_str(),
896           key_length));
897     }
898     st = store_array_schema(array_schema, encryption_key_cfg);
899   } else {
900     st = store_array_schema(array_schema, encryption_key);
901   }
902 
903   // Store array schema
904   if (!st.ok()) {
905     vfs_->remove_dir(array_uri);
906     return st;
907   }
908 
909   // Create array and array metadata filelocks
910   URI array_filelock_uri = array_uri.join_path(constants::filelock_name);
911   st = vfs_->touch(array_filelock_uri);
912   if (!st.ok()) {
913     vfs_->remove_dir(array_uri);
914     return st;
915   }
916 
917   return Status::Ok();
918 }
919 
array_evolve_schema(const URI & array_uri,ArraySchemaEvolution * schema_evolution,const EncryptionKey & encryption_key)920 Status StorageManager::array_evolve_schema(
921     const URI& array_uri,
922     ArraySchemaEvolution* schema_evolution,
923     const EncryptionKey& encryption_key) {
924   // Check array schema
925   if (schema_evolution == nullptr) {
926     return logger_->status(Status::StorageManagerError(
927         "Cannot evolve array; Empty schema evolution"));
928   }
929 
930   if (array_uri.is_tiledb()) {
931     return rest_client_->post_array_schema_evolution_to_rest(
932         array_uri, schema_evolution);
933   }
934 
935   // Check if array exists
936   bool exists = false;
937   RETURN_NOT_OK(is_array(array_uri, &exists));
938   if (!exists)
939     return logger_->status(Status::StorageManagerError(
940         std::string("Cannot evolve array; Array '") + array_uri.c_str() +
941         "' not exists"));
942 
943   ArraySchema* array_schema = (ArraySchema*)nullptr;
944   RETURN_NOT_OK(load_array_schema(array_uri, encryption_key, &array_schema));
945 
946   // Evolve schema
947   ArraySchema* array_schema_evolved = (ArraySchema*)nullptr;
948   RETURN_NOT_OK(
949       schema_evolution->evolve_schema(array_schema, &array_schema_evolved));
950 
951   Status st = store_array_schema(array_schema_evolved, encryption_key);
952   if (!st.ok()) {
953     tdb_delete(array_schema_evolved);
954     logger_->status(st);
955     return logger_->status(Status::StorageManagerError(
956         "Cannot evovle schema;  Not able to store evolved array schema."));
957   }
958 
959   tdb_delete(array_schema);
960   array_schema = nullptr;
961   tdb_delete(array_schema_evolved);
962   array_schema_evolved = nullptr;
963 
964   return Status::Ok();
965 }
966 
array_upgrade_version(const URI & array_uri,const Config * config)967 Status StorageManager::array_upgrade_version(
968     const URI& array_uri, const Config* config) {
969   // Check if array exists
970   bool exists = false;
971   RETURN_NOT_OK(is_array(array_uri, &exists));
972   if (!exists)
973     return logger_->status(Status::StorageManagerError(
974         std::string("Cannot upgrade array; Array '") + array_uri.c_str() +
975         "' does not exist"));
976 
977   // If 'config' is unset, use the 'config_' that was set during initialization
978   // of this StorageManager instance.
979   if (!config) {
980     config = &config_;
981   }
982 
983   // Get encryption key from config
984   bool found = false;
985   std::string encryption_key_from_cfg =
986       config_.get("sm.encryption_key", &found);
987   assert(found);
988   std::string encryption_type_from_cfg =
989       config_.get("sm.encryption_type", &found);
990   assert(found);
991   auto [st, etc] = encryption_type_enum(encryption_type_from_cfg);
992   RETURN_NOT_OK(st);
993   EncryptionType encryption_type_cfg = etc.value();
994 
995   EncryptionKey encryption_key_cfg;
996   if (encryption_key_from_cfg.empty()) {
997     RETURN_NOT_OK(encryption_key_cfg.set_key(encryption_type_cfg, nullptr, 0));
998   } else {
999     uint32_t key_length = 0;
1000     if (EncryptionKey::is_valid_key_length(
1001             encryption_type_cfg,
1002             static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
1003       const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
1004       if (unit_test_cfg.array_encryption_key_length.is_set()) {
1005         key_length = unit_test_cfg.array_encryption_key_length.get();
1006       } else {
1007         key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
1008       }
1009     }
1010     RETURN_NOT_OK(encryption_key_cfg.set_key(
1011         encryption_type_cfg,
1012         (const void*)encryption_key_from_cfg.c_str(),
1013         key_length));
1014   }
1015 
1016   ArraySchema* array_schema = (ArraySchema*)nullptr;
1017   RETURN_NOT_OK(
1018       load_array_schema(array_uri, encryption_key_cfg, &array_schema));
1019 
1020   if (array_schema->version() < constants::format_version) {
1021     Status st = array_schema->generate_uri();
1022     if (!st.ok()) {
1023       logger_->status(st);
1024       // Clean up
1025       tdb_delete(array_schema);
1026       return st;
1027     }
1028     array_schema->set_version(constants::format_version);
1029 
1030     // Create array schema directory if necessary
1031     URI array_schema_folder_uri =
1032         array_uri.join_path(constants::array_schema_folder_name);
1033     st = vfs_->create_dir(array_schema_folder_uri);
1034     if (!st.ok()) {
1035       logger_->status(st);
1036       // Clean up
1037       tdb_delete(array_schema);
1038       return st;
1039     }
1040 
1041     st = store_array_schema(array_schema, encryption_key_cfg);
1042     if (!st.ok()) {
1043       logger_->status(st);
1044       // Clean up
1045       tdb_delete(array_schema);
1046       return st;
1047     }
1048   }
1049 
1050   // Clean up
1051   tdb_delete(array_schema);
1052 
1053   return Status::Ok();
1054 }
1055 
array_memory_tracker(const URI & array_uri,bool top_level)1056 OpenArrayMemoryTracker* StorageManager::array_memory_tracker(
1057     const URI& array_uri, bool top_level) {
1058   // Lock mutex
1059   std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
1060 
1061   // Find the open array to retrieve the memory tracker.
1062   auto it = open_arrays_for_reads_.find(array_uri.to_string());
1063   if (it == open_arrays_for_reads_.end()) {
1064     if (top_level) {
1065       // TODO remove this work around once VCF runs on only context.
1066       // The memory tracker could live on another context, try to find it.
1067       auto& global_state = global_state::GlobalState::GetGlobalState();
1068       return global_state.array_memory_tracker(array_uri, this);
1069     } else {
1070       return nullptr;
1071     }
1072   }
1073 
1074   return it->second->memory_tracker();
1075 }
1076 
array_get_non_empty_domain(Array * array,NDRange * domain,bool * is_empty)1077 Status StorageManager::array_get_non_empty_domain(
1078     Array* array, NDRange* domain, bool* is_empty) {
1079   if (domain == nullptr)
1080     return logger_->status(Status::StorageManagerError(
1081         "Cannot get non-empty domain; Domain object is null"));
1082 
1083   if (array == nullptr)
1084     return logger_->status(Status::StorageManagerError(
1085         "Cannot get non-empty domain; Array object is null"));
1086 
1087   if (!array->is_remote() &&
1088       open_arrays_for_reads_.find(array->array_uri().to_string()) ==
1089           open_arrays_for_reads_.end())
1090     return logger_->status(Status::StorageManagerError(
1091         "Cannot get non-empty domain; Array not opened for reads"));
1092 
1093   *domain = array->non_empty_domain();
1094   *is_empty = domain->empty();
1095 
1096   return Status::Ok();
1097 }
1098 
array_get_non_empty_domain(Array * array,void * domain,bool * is_empty)1099 Status StorageManager::array_get_non_empty_domain(
1100     Array* array, void* domain, bool* is_empty) {
1101   if (array == nullptr)
1102     return logger_->status(Status::StorageManagerError(
1103         "Cannot get non-empty domain; Array object is null"));
1104 
1105   if (!array->array_schema_latest()->domain()->all_dims_same_type())
1106     return logger_->status(Status::StorageManagerError(
1107         "Cannot get non-empty domain; Function non-applicable to arrays with "
1108         "heterogenous dimensions"));
1109 
1110   if (!array->array_schema_latest()->domain()->all_dims_fixed())
1111     return logger_->status(Status::StorageManagerError(
1112         "Cannot get non-empty domain; Function non-applicable to arrays with "
1113         "variable-sized dimensions"));
1114 
1115   NDRange dom;
1116   RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1117   if (*is_empty)
1118     return Status::Ok();
1119 
1120   auto array_schema = array->array_schema_latest();
1121   auto dim_num = array_schema->dim_num();
1122   auto domain_c = (unsigned char*)domain;
1123   uint64_t offset = 0;
1124   for (unsigned d = 0; d < dim_num; ++d) {
1125     std::memcpy(&domain_c[offset], dom[d].data(), dom[d].size());
1126     offset += dom[d].size();
1127   }
1128 
1129   return Status::Ok();
1130 }
1131 
array_get_non_empty_domain_from_index(Array * array,unsigned idx,void * domain,bool * is_empty)1132 Status StorageManager::array_get_non_empty_domain_from_index(
1133     Array* array, unsigned idx, void* domain, bool* is_empty) {
1134   // For easy reference
1135   auto array_schema = array->array_schema_latest();
1136   auto array_domain = array_schema->domain();
1137 
1138   // Sanity checks
1139   if (idx >= array_schema->dim_num())
1140     return logger_->status(Status::StorageManagerError(
1141         "Cannot get non-empty domain; Invalid dimension index"));
1142   if (array_domain->dimension(idx)->var_size()) {
1143     std::string errmsg = "Cannot get non-empty domain; Dimension '";
1144     errmsg += array_domain->dimension(idx)->name();
1145     errmsg += "' is variable-sized";
1146     return logger_->status(Status::StorageManagerError(errmsg));
1147   }
1148 
1149   NDRange dom;
1150   RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1151   if (*is_empty)
1152     return Status::Ok();
1153 
1154   std::memcpy(domain, dom[idx].data(), dom[idx].size());
1155   return Status::Ok();
1156 }
1157 
array_get_non_empty_domain_from_name(Array * array,const char * name,void * domain,bool * is_empty)1158 Status StorageManager::array_get_non_empty_domain_from_name(
1159     Array* array, const char* name, void* domain, bool* is_empty) {
1160   // Sanity check
1161   if (name == nullptr)
1162     return logger_->status(Status::StorageManagerError(
1163         "Cannot get non-empty domain; Invalid dimension name"));
1164 
1165   NDRange dom;
1166   RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1167 
1168   auto array_schema = array->array_schema_latest();
1169   auto array_domain = array_schema->domain();
1170   auto dim_num = array_schema->dim_num();
1171   for (unsigned d = 0; d < dim_num; ++d) {
1172     auto dim_name = array_schema->dimension(d)->name();
1173     if (name == dim_name) {
1174       // Sanity check
1175       if (array_domain->dimension(d)->var_size()) {
1176         std::string errmsg = "Cannot get non-empty domain; Dimension '";
1177         errmsg += dim_name + "' is variable-sized";
1178         return logger_->status(Status::StorageManagerError(errmsg));
1179       }
1180 
1181       if (!*is_empty)
1182         std::memcpy(domain, dom[d].data(), dom[d].size());
1183       return Status::Ok();
1184     }
1185   }
1186 
1187   return logger_->status(Status::StorageManagerError(
1188       std::string("Cannot get non-empty domain; Dimension name '") + name +
1189       "' does not exist"));
1190 }
1191 
array_get_non_empty_domain_var_size_from_index(Array * array,unsigned idx,uint64_t * start_size,uint64_t * end_size,bool * is_empty)1192 Status StorageManager::array_get_non_empty_domain_var_size_from_index(
1193     Array* array,
1194     unsigned idx,
1195     uint64_t* start_size,
1196     uint64_t* end_size,
1197     bool* is_empty) {
1198   // For easy reference
1199   auto array_schema = array->array_schema_latest();
1200   auto array_domain = array_schema->domain();
1201 
1202   // Sanity checks
1203   if (idx >= array_schema->dim_num())
1204     return logger_->status(Status::StorageManagerError(
1205         "Cannot get non-empty domain; Invalid dimension index"));
1206   if (!array_domain->dimension(idx)->var_size()) {
1207     std::string errmsg = "Cannot get non-empty domain; Dimension '";
1208     errmsg += array_domain->dimension(idx)->name();
1209     errmsg += "' is fixed-sized";
1210     return logger_->status(Status::StorageManagerError(errmsg));
1211   }
1212 
1213   NDRange dom;
1214   RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1215   if (*is_empty) {
1216     *start_size = 0;
1217     *end_size = 0;
1218     return Status::Ok();
1219   }
1220 
1221   *start_size = dom[idx].start_size();
1222   *end_size = dom[idx].end_size();
1223 
1224   return Status::Ok();
1225 }
1226 
array_get_non_empty_domain_var_size_from_name(Array * array,const char * name,uint64_t * start_size,uint64_t * end_size,bool * is_empty)1227 Status StorageManager::array_get_non_empty_domain_var_size_from_name(
1228     Array* array,
1229     const char* name,
1230     uint64_t* start_size,
1231     uint64_t* end_size,
1232     bool* is_empty) {
1233   // Sanity check
1234   if (name == nullptr)
1235     return logger_->status(Status::StorageManagerError(
1236         "Cannot get non-empty domain; Invalid dimension name"));
1237 
1238   NDRange dom;
1239   RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1240 
1241   auto array_schema = array->array_schema_latest();
1242   auto array_domain = array_schema->domain();
1243   auto dim_num = array_schema->dim_num();
1244   for (unsigned d = 0; d < dim_num; ++d) {
1245     auto dim_name = array_schema->dimension(d)->name();
1246     if (name == dim_name) {
1247       // Sanity check
1248       if (!array_domain->dimension(d)->var_size()) {
1249         std::string errmsg = "Cannot get non-empty domain; Dimension '";
1250         errmsg += dim_name + "' is fixed-sized";
1251         return logger_->status(Status::StorageManagerError(errmsg));
1252       }
1253 
1254       if (*is_empty) {
1255         *start_size = 0;
1256         *end_size = 0;
1257       } else {
1258         *start_size = dom[d].start_size();
1259         *end_size = dom[d].end_size();
1260       }
1261 
1262       return Status::Ok();
1263     }
1264   }
1265 
1266   return logger_->status(Status::StorageManagerError(
1267       std::string("Cannot get non-empty domain; Dimension name '") + name +
1268       "' does not exist"));
1269 }
1270 
array_get_non_empty_domain_var_from_index(Array * array,unsigned idx,void * start,void * end,bool * is_empty)1271 Status StorageManager::array_get_non_empty_domain_var_from_index(
1272     Array* array, unsigned idx, void* start, void* end, bool* is_empty) {
1273   // For easy reference
1274   auto array_schema = array->array_schema_latest();
1275   auto array_domain = array_schema->domain();
1276 
1277   // Sanity checks
1278   if (idx >= array_schema->dim_num())
1279     return logger_->status(Status::StorageManagerError(
1280         "Cannot get non-empty domain; Invalid dimension index"));
1281   if (!array_domain->dimension(idx)->var_size()) {
1282     std::string errmsg = "Cannot get non-empty domain; Dimension '";
1283     errmsg += array_domain->dimension(idx)->name();
1284     errmsg += "' is fixed-sized";
1285     return logger_->status(Status::StorageManagerError(errmsg));
1286   }
1287 
1288   NDRange dom;
1289   RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1290 
1291   if (*is_empty)
1292     return Status::Ok();
1293 
1294   std::memcpy(start, dom[idx].start(), dom[idx].start_size());
1295   std::memcpy(end, dom[idx].end(), dom[idx].end_size());
1296 
1297   return Status::Ok();
1298 }
1299 
array_get_non_empty_domain_var_from_name(Array * array,const char * name,void * start,void * end,bool * is_empty)1300 Status StorageManager::array_get_non_empty_domain_var_from_name(
1301     Array* array, const char* name, void* start, void* end, bool* is_empty) {
1302   // Sanity check
1303   if (name == nullptr)
1304     return logger_->status(Status::StorageManagerError(
1305         "Cannot get non-empty domain; Invalid dimension name"));
1306 
1307   NDRange dom;
1308   RETURN_NOT_OK(array_get_non_empty_domain(array, &dom, is_empty));
1309 
1310   auto array_schema = array->array_schema_latest();
1311   auto array_domain = array_schema->domain();
1312   auto dim_num = array_schema->dim_num();
1313   for (unsigned d = 0; d < dim_num; ++d) {
1314     auto dim_name = array_schema->dimension(d)->name();
1315     if (name == dim_name) {
1316       // Sanity check
1317       if (!array_domain->dimension(d)->var_size()) {
1318         std::string errmsg = "Cannot get non-empty domain; Dimension '";
1319         errmsg += dim_name + "' is fixed-sized";
1320         return logger_->status(Status::StorageManagerError(errmsg));
1321       }
1322 
1323       if (!*is_empty) {
1324         std::memcpy(start, dom[d].start(), dom[d].start_size());
1325         std::memcpy(end, dom[d].end(), dom[d].end_size());
1326       }
1327 
1328       return Status::Ok();
1329     }
1330   }
1331 
1332   return logger_->status(Status::StorageManagerError(
1333       std::string("Cannot get non-empty domain; Dimension name '") + name +
1334       "' does not exist"));
1335 }
1336 
array_get_encryption(const std::string & array_uri,EncryptionType * encryption_type)1337 Status StorageManager::array_get_encryption(
1338     const std::string& array_uri, EncryptionType* encryption_type) {
1339   URI uri(array_uri);
1340 
1341   if (uri.is_invalid())
1342     return logger_->status(Status::StorageManagerError(
1343         "Cannot get array encryption; Invalid array URI"));
1344 
1345   URI schema_uri;
1346   RETURN_NOT_OK(get_latest_array_schema_uri(uri, &schema_uri));
1347 
1348   // Read tile header.
1349   GenericTileIO::GenericTileHeader header;
1350   RETURN_NOT_OK(
1351       GenericTileIO::read_generic_tile_header(this, schema_uri, 0, &header));
1352   *encryption_type = static_cast<EncryptionType>(header.encryption_type);
1353 
1354   return Status::Ok();
1355 }
1356 
array_xlock(const URI & array_uri)1357 Status StorageManager::array_xlock(const URI& array_uri) {
1358   // Get exclusive lock for threads
1359   xlock_mtx_.lock();
1360 
1361   // Wait until the array is closed for reads
1362   std::unique_lock<std::mutex> lk(open_array_for_reads_mtx_);
1363   xlock_cv_.wait(lk, [this, array_uri] {
1364     return open_arrays_for_reads_.find(array_uri.to_string()) ==
1365            open_arrays_for_reads_.end();
1366   });
1367 
1368   // Get exclusive lock for processes through a filelock
1369   filelock_t filelock = INVALID_FILELOCK;
1370   auto lock_uri = array_uri.join_path(constants::filelock_name);
1371   RETURN_NOT_OK_ELSE(
1372       vfs_->filelock_lock(lock_uri, &filelock, false), xlock_mtx_.unlock());
1373   xfilelocks_[array_uri.to_string()] = filelock;
1374 
1375   return Status::Ok();
1376 }
1377 
array_xunlock(const URI & array_uri)1378 Status StorageManager::array_xunlock(const URI& array_uri) {
1379   // Get filelock if it exists
1380   auto it = xfilelocks_.find(array_uri.to_string());
1381   if (it == xfilelocks_.end())
1382     return logger_->status(Status::StorageManagerError(
1383         "Cannot unlock array exclusive lock; Filelock not found"));
1384   auto filelock = it->second;
1385 
1386   // Release exclusive lock for processes through the filelock
1387   auto lock_uri = array_uri.join_path(constants::filelock_name);
1388   if (filelock != INVALID_FILELOCK)
1389     RETURN_NOT_OK(vfs_->filelock_unlock(lock_uri));
1390   xfilelocks_.erase(it);
1391 
1392   // Release exclusive lock for threads
1393   xlock_mtx_.unlock();
1394 
1395   return Status::Ok();
1396 }
1397 
async_push_query(Query * query)1398 Status StorageManager::async_push_query(Query* query) {
1399   cancelable_tasks_.execute(
1400       compute_tp_,
1401       [this, query]() {
1402         // Process query.
1403         Status st = query_submit(query);
1404         if (!st.ok())
1405           logger_->status(st);
1406         return st;
1407       },
1408       [query]() {
1409         // Task was cancelled. This is safe to perform in a separate thread,
1410         // as we are guaranteed by the thread pool not to have entered
1411         // query->process() yet.
1412         query->cancel();
1413       });
1414 
1415   return Status::Ok();
1416 }
1417 
cancel_all_tasks()1418 Status StorageManager::cancel_all_tasks() {
1419   // Check if there is already a "cancellation" in progress.
1420   bool handle_cancel = false;
1421   {
1422     std::unique_lock<std::mutex> lck(cancellation_in_progress_mtx_);
1423     if (!cancellation_in_progress_) {
1424       cancellation_in_progress_ = true;
1425       handle_cancel = true;
1426     }
1427   }
1428 
1429   // Handle the cancellation.
1430   if (handle_cancel) {
1431     // Cancel any queued tasks.
1432     cancelable_tasks_.cancel_all_tasks();
1433 
1434     // Only call VFS cancel if the object has been constructed
1435     if (vfs_ != nullptr)
1436       vfs_->cancel_all_tasks();
1437 
1438     // Wait for in-progress queries to finish.
1439     wait_for_zero_in_progress();
1440 
1441     // Reset the cancellation flag.
1442     std::unique_lock<std::mutex> lck(cancellation_in_progress_mtx_);
1443     cancellation_in_progress_ = false;
1444   }
1445 
1446   return Status::Ok();
1447 }
1448 
cancellation_in_progress()1449 bool StorageManager::cancellation_in_progress() {
1450   std::unique_lock<std::mutex> lck(cancellation_in_progress_mtx_);
1451   return cancellation_in_progress_;
1452 }
1453 
config() const1454 const Config& StorageManager::config() const {
1455   return config_;
1456 }
1457 
create_dir(const URI & uri)1458 Status StorageManager::create_dir(const URI& uri) {
1459   return vfs_->create_dir(uri);
1460 }
1461 
is_dir(const URI & uri,bool * is_dir) const1462 Status StorageManager::is_dir(const URI& uri, bool* is_dir) const {
1463   return vfs_->is_dir(uri, is_dir);
1464 }
1465 
touch(const URI & uri)1466 Status StorageManager::touch(const URI& uri) {
1467   return vfs_->touch(uri);
1468 }
1469 
decrement_in_progress()1470 void StorageManager::decrement_in_progress() {
1471   std::unique_lock<std::mutex> lck(queries_in_progress_mtx_);
1472   queries_in_progress_--;
1473   queries_in_progress_cv_.notify_all();
1474 }
1475 
object_remove(const char * path) const1476 Status StorageManager::object_remove(const char* path) const {
1477   auto uri = URI(path);
1478   if (uri.is_invalid())
1479     return logger_->status(Status::StorageManagerError(
1480         std::string("Cannot remove object '") + path + "'; Invalid URI"));
1481 
1482   ObjectType obj_type;
1483   RETURN_NOT_OK(object_type(uri, &obj_type));
1484   if (obj_type == ObjectType::INVALID)
1485     return logger_->status(Status::StorageManagerError(
1486         std::string("Cannot remove object '") + path +
1487         "'; Invalid TileDB object"));
1488 
1489   return vfs_->remove_dir(uri);
1490 }
1491 
object_move(const char * old_path,const char * new_path) const1492 Status StorageManager::object_move(
1493     const char* old_path, const char* new_path) const {
1494   auto old_uri = URI(old_path);
1495   if (old_uri.is_invalid())
1496     return logger_->status(Status::StorageManagerError(
1497         std::string("Cannot move object '") + old_path + "'; Invalid URI"));
1498 
1499   auto new_uri = URI(new_path);
1500   if (new_uri.is_invalid())
1501     return logger_->status(Status::StorageManagerError(
1502         std::string("Cannot move object to '") + new_path + "'; Invalid URI"));
1503 
1504   ObjectType obj_type;
1505   RETURN_NOT_OK(object_type(old_uri, &obj_type));
1506   if (obj_type == ObjectType::INVALID)
1507     return logger_->status(Status::StorageManagerError(
1508         std::string("Cannot move object '") + old_path +
1509         "'; Invalid TileDB object"));
1510 
1511   return vfs_->move_dir(old_uri, new_uri);
1512 }
1513 
get_fragment_info(const Array & array,uint64_t timestamp_start,uint64_t timestamp_end,FragmentInfo * fragment_info,bool get_to_vacuum)1514 Status StorageManager::get_fragment_info(
1515     const Array& array,
1516     uint64_t timestamp_start,
1517     uint64_t timestamp_end,
1518     FragmentInfo* fragment_info,
1519     bool get_to_vacuum) {
1520   fragment_info->clear();
1521 
1522   // Open array for reading
1523   auto array_schema = array.array_schema_latest();
1524   auto array_type = array_schema->array_type();
1525 
1526   fragment_info->set_dim_info(
1527       array_schema->dim_names(), array_schema->dim_types());
1528 
1529   // Get encryption key from config
1530   auto&& [st, array_schema_opt, array_schemas_opt, fragment_metadata_opt] =
1531       array_reopen(
1532           array.array_uri(),
1533           *array.encryption_key(),
1534           array_type == ArrayType::SPARSE ? timestamp_start : 0,
1535           timestamp_end);
1536 
1537   if (!st.ok())
1538     return st;
1539 
1540   // Return if array is empty
1541   if (fragment_metadata_opt.value().empty())
1542     return Status::Ok();
1543 
1544   const auto& fragment_metadata = fragment_metadata_opt.value();
1545 
1546   std::vector<uint64_t> sizes(fragment_metadata.size());
1547 
1548   RETURN_NOT_OK(parallel_for(
1549       this->compute_tp_,
1550       0,
1551       fragment_metadata.size(),
1552       [&fragment_metadata, &sizes](uint64_t i) {
1553         const auto meta = fragment_metadata[i];
1554 
1555         // Get fragment size
1556         uint64_t size;
1557         RETURN_NOT_OK(meta->fragment_size(&size));
1558         sizes[i] = size;
1559 
1560         return Status::Ok();
1561       }));
1562 
1563   for (uint64_t i = 0; i < fragment_metadata.size(); i++) {
1564     const auto meta = fragment_metadata[i];
1565     const auto& non_empty_domain = meta->non_empty_domain();
1566 
1567     if (meta->timestamp_range().first < timestamp_start) {
1568       fragment_info->expand_anterior_ndrange(
1569           meta->array_schema()->domain(), non_empty_domain);
1570     } else {
1571       const auto& uri = meta->fragment_uri();
1572       bool sparse = !meta->dense();
1573 
1574       // compute expanded non-empty domain (only for dense fragments)
1575       auto expanded_non_empty_domain = non_empty_domain;
1576       if (!sparse)
1577         meta->array_schema()->domain()->expand_to_tiles(
1578             &expanded_non_empty_domain);
1579 
1580       // Push new fragment info
1581       fragment_info->append(SingleFragmentInfo(
1582           uri,
1583           sparse,
1584           meta->timestamp_range(),
1585           sizes[i],
1586           non_empty_domain,
1587           expanded_non_empty_domain,
1588           meta));
1589     }
1590   }
1591 
1592   // Optionally get the URIs to vacuum
1593   if (get_to_vacuum) {
1594     std::vector<URI> to_vacuum, vac_uris, fragment_uris;
1595     URI meta_uri;
1596     RETURN_NOT_OK(
1597         get_fragment_uris(array.array_uri(), &fragment_uris, &meta_uri));
1598     RETURN_NOT_OK(get_uris_to_vacuum(
1599         fragment_uris, timestamp_start, timestamp_end, &to_vacuum, &vac_uris));
1600     fragment_info->set_to_vacuum(to_vacuum);
1601   }
1602 
1603   return Status::Ok();
1604 }
1605 
get_fragment_uris(const URI & array_uri,std::vector<URI> * fragment_uris,URI * meta_uri) const1606 Status StorageManager::get_fragment_uris(
1607     const URI& array_uri,
1608     std::vector<URI>* fragment_uris,
1609     URI* meta_uri) const {
1610   auto timer_se = stats_->start_timer("read_get_fragment_uris");
1611   // Get all uris in the array directory
1612   std::vector<URI> uris;
1613   RETURN_NOT_OK(vfs_->ls(array_uri.add_trailing_slash(), &uris));
1614 
1615   // Get the fragments that have special "ok" URIs, which indicate
1616   // that fragments are "committed" for versions >= 5
1617   std::set<URI> ok_uris;
1618   for (size_t i = 0; i < uris.size(); ++i) {
1619     if (utils::parse::ends_with(
1620             uris[i].to_string(), constants::ok_file_suffix)) {
1621       auto name = uris[i].to_string();
1622       name = name.substr(0, name.size() - constants::ok_file_suffix.size());
1623       ok_uris.emplace(URI(name));
1624     }
1625   }
1626 
1627   // Get only the committed fragment uris
1628   std::vector<int> is_fragment(uris.size(), 0);
1629   auto status = parallel_for(compute_tp_, 0, uris.size(), [&](size_t i) {
1630     if (utils::parse::starts_with(uris[i].last_path_part(), "."))
1631       return Status::Ok();
1632     RETURN_NOT_OK(this->is_fragment(uris[i], ok_uris, &is_fragment[i]));
1633     return Status::Ok();
1634   });
1635   RETURN_NOT_OK(status);
1636 
1637   for (size_t i = 0; i < uris.size(); ++i) {
1638     if (is_fragment[i])
1639       fragment_uris->emplace_back(uris[i]);
1640     else if (this->is_vacuum_file(uris[i]))
1641       fragment_uris->emplace_back(uris[i]);
1642   }
1643 
1644   // Get the latest consolidated fragment metadata URI
1645   RETURN_NOT_OK(get_consolidated_fragment_meta_uri(uris, meta_uri));
1646 
1647   return Status::Ok();
1648 }
1649 
tags() const1650 const std::unordered_map<std::string, std::string>& StorageManager::tags()
1651     const {
1652   return tags_;
1653 }
1654 
get_fragment_info(const Array & array,const URI & fragment_uri,SingleFragmentInfo * fragment_info)1655 Status StorageManager::get_fragment_info(
1656     const Array& array,
1657     const URI& fragment_uri,
1658     SingleFragmentInfo* fragment_info) {
1659   // Get timestamp range
1660   std::pair<uint64_t, uint64_t> timestamp_range;
1661   RETURN_NOT_OK(
1662       utils::parse::get_timestamp_range(fragment_uri, &timestamp_range));
1663   uint32_t version;
1664   auto name = fragment_uri.remove_trailing_slash().last_path_part();
1665   RETURN_NOT_OK(utils::parse::get_fragment_name_version(name, &version));
1666 
1667   // Check if fragment is sparse
1668   bool sparse = false;
1669   if (version == 1) {  // This corresponds to format version <=2
1670     URI coords_uri =
1671         fragment_uri.join_path(constants::coords + constants::file_suffix);
1672     RETURN_NOT_OK(vfs_->is_file(coords_uri, &sparse));
1673   } else {
1674     // Do nothing. It does not matter what the `sparse` value
1675     // is, since the FragmentMetadata object will load the correct
1676     // value from the metadata file.
1677 
1678     // Also `sparse` is updated below after loading the metadata
1679   }
1680 
1681   // Get fragment non-empty domain
1682   auto meta = tdb_make_shared(
1683       FragmentMetadata,
1684       this,
1685       array.array_schema_latest(),
1686       fragment_uri,
1687       timestamp_range,
1688       !sparse);
1689   RETURN_NOT_OK(meta->load(
1690       *array.encryption_key(), nullptr, 0, array.array_schemas_all()));
1691 
1692   // This is important for format version > 2
1693   sparse = !meta->dense();
1694 
1695   // Get fragment size
1696   uint64_t size;
1697   RETURN_NOT_OK(meta->fragment_size(&size));
1698 
1699   // Compute expanded non-empty domain only for dense fragments
1700   // Get non-empty domain, and compute expanded non-empty domain
1701   // (only for dense fragments)
1702   const auto& non_empty_domain = meta->non_empty_domain();
1703   auto expanded_non_empty_domain = non_empty_domain;
1704   if (!sparse)
1705     array.array_schema_latest()->domain()->expand_to_tiles(
1706         &expanded_non_empty_domain);
1707 
1708   // Set fragment info
1709   *fragment_info = SingleFragmentInfo(
1710       fragment_uri,
1711       sparse,
1712       timestamp_range,
1713       size,
1714       non_empty_domain,
1715       expanded_non_empty_domain,
1716       meta);
1717 
1718   return Status::Ok();
1719 }
1720 
group_create(const std::string & group)1721 Status StorageManager::group_create(const std::string& group) {
1722   // Create group URI
1723   URI uri(group);
1724   if (uri.is_invalid())
1725     return logger_->status(Status::StorageManagerError(
1726         "Cannot create group '" + group + "'; Invalid group URI"));
1727 
1728   // Check if group exists
1729   bool exists;
1730   RETURN_NOT_OK(is_group(uri, &exists));
1731   if (exists)
1732     return logger_->status(Status::StorageManagerError(
1733         std::string("Cannot create group; Group '") + uri.c_str() +
1734         "' already exists"));
1735 
1736   std::lock_guard<std::mutex> lock{object_create_mtx_};
1737 
1738   // Create group directory
1739   RETURN_NOT_OK(vfs_->create_dir(uri));
1740 
1741   // Create group file
1742   URI group_filename = uri.join_path(constants::group_filename);
1743   Status st = vfs_->touch(group_filename);
1744   if (!st.ok()) {
1745     vfs_->remove_dir(uri);
1746     return st;
1747   }
1748   return st;
1749 }
1750 
init(const Config * config)1751 Status StorageManager::init(const Config* config) {
1752   if (config != nullptr)
1753     config_ = *config;
1754 
1755   // Get config params
1756   bool found = false;
1757   uint64_t tile_cache_size = 0;
1758   RETURN_NOT_OK(
1759       config_.get<uint64_t>("sm.tile_cache_size", &tile_cache_size, &found));
1760   assert(found);
1761 
1762   tile_cache_ =
1763       tdb_unique_ptr<BufferLRUCache>(tdb_new(BufferLRUCache, tile_cache_size));
1764 
1765   // GlobalState must be initialized before `vfs->init` because S3::init calls
1766   // GetGlobalState
1767   auto& global_state = global_state::GlobalState::GetGlobalState();
1768   RETURN_NOT_OK(global_state.init(config));
1769 
1770   vfs_ = tdb_new(VFS);
1771   RETURN_NOT_OK(vfs_->init(stats_, compute_tp_, io_tp_, &config_, nullptr));
1772 #ifdef TILEDB_SERIALIZATION
1773   RETURN_NOT_OK(init_rest_client());
1774 #endif
1775 
1776   RETURN_NOT_OK(set_default_tags());
1777 
1778   global_state.register_storage_manager(this);
1779 
1780   return Status::Ok();
1781 }
1782 
compute_tp()1783 ThreadPool* StorageManager::compute_tp() {
1784   return compute_tp_;
1785 }
1786 
io_tp()1787 ThreadPool* StorageManager::io_tp() {
1788   return io_tp_;
1789 }
1790 
rest_client() const1791 RestClient* StorageManager::rest_client() const {
1792   return rest_client_.get();
1793 }
1794 
increment_in_progress()1795 void StorageManager::increment_in_progress() {
1796   std::unique_lock<std::mutex> lck(queries_in_progress_mtx_);
1797   queries_in_progress_++;
1798   queries_in_progress_cv_.notify_all();
1799 }
1800 
is_array(const URI & uri,bool * is_array) const1801 Status StorageManager::is_array(const URI& uri, bool* is_array) const {
1802   // Check if the schema directory exists or not
1803   bool is_dir = false;
1804   // Since is_dir could return NOT Ok status, we will not use RETURN_NOT_OK here
1805   Status st =
1806       vfs_->is_dir(uri.join_path(constants::array_schema_folder_name), &is_dir);
1807   if (st.ok() && is_dir) {
1808     *is_array = true;
1809     return Status::Ok();
1810   }
1811 
1812   // If there is no schema directory, we check schema file
1813   RETURN_NOT_OK(
1814       vfs_->is_file(uri.join_path(constants::array_schema_filename), is_array));
1815   return Status::Ok();
1816 }
1817 
is_file(const URI & uri,bool * is_file) const1818 Status StorageManager::is_file(const URI& uri, bool* is_file) const {
1819   RETURN_NOT_OK(vfs_->is_file(uri, is_file));
1820   return Status::Ok();
1821 }
1822 
is_fragment(const URI & uri,const std::set<URI> & ok_uris,int * is_fragment) const1823 Status StorageManager::is_fragment(
1824     const URI& uri, const std::set<URI>& ok_uris, int* is_fragment) const {
1825   // If the URI name has a suffix, then it is not a fragment
1826   auto name = uri.remove_trailing_slash().last_path_part();
1827   if (name.find_first_of('.') != std::string::npos) {
1828     *is_fragment = 0;
1829     return Status::Ok();
1830   }
1831 
1832   // Check set membership in ok_uris
1833   if (ok_uris.find(uri) != ok_uris.end()) {
1834     *is_fragment = 1;
1835     return Status::Ok();
1836   }
1837 
1838   // If the format version is >= 5, then the above suffices to check if
1839   // the URI is indeed a fragment
1840   uint32_t version;
1841   RETURN_NOT_OK(utils::parse::get_fragment_version(name, &version));
1842   if (version != UINT32_MAX && version >= 5) {
1843     *is_fragment = false;
1844     return Status::Ok();
1845   }
1846 
1847   // Versions < 5
1848   bool is_file;
1849   RETURN_NOT_OK(vfs_->is_file(
1850       uri.join_path(constants::fragment_metadata_filename), &is_file));
1851   *is_fragment = (int)is_file;
1852   return Status::Ok();
1853 }
1854 
is_group(const URI & uri,bool * is_group) const1855 Status StorageManager::is_group(const URI& uri, bool* is_group) const {
1856   RETURN_NOT_OK(
1857       vfs_->is_file(uri.join_path(constants::group_filename), is_group));
1858   return Status::Ok();
1859 }
1860 
is_vacuum_file(const URI & uri) const1861 bool StorageManager::is_vacuum_file(const URI& uri) const {
1862   // If the URI name has a suffix, then it is not a fragment
1863   if (utils::parse::ends_with(uri.to_string(), constants::vacuum_file_suffix))
1864     return true;
1865 
1866   return false;
1867 }
1868 
get_array_schema_uris(const URI & array_uri,std::vector<URI> * schema_uris) const1869 Status StorageManager::get_array_schema_uris(
1870     const URI& array_uri, std::vector<URI>* schema_uris) const {
1871   auto timer_se = stats_->start_timer("read_get_array_schema_uris");
1872 
1873   schema_uris->clear();
1874   URI old_schema_uri = array_uri.join_path(constants::array_schema_filename);
1875   bool has_file = false;
1876   RETURN_NOT_OK(vfs_->is_file(old_schema_uri, &has_file));
1877   if (has_file) {
1878     schema_uris->push_back(old_schema_uri);
1879   }
1880 
1881   URI schema_folder_uri =
1882       array_uri.join_path(constants::array_schema_folder_name);
1883   // Check if schema_folder_uri exists. For some file systems, such as win, ls
1884   // will return error if the folder does not exist.
1885   bool has_dir = false;
1886   RETURN_NOT_OK(vfs_->is_dir(schema_folder_uri, &has_dir));
1887   if (has_dir) {
1888     std::vector<URI> array_schema_uris;
1889     RETURN_NOT_OK(vfs_->ls(schema_folder_uri, &array_schema_uris));
1890     if (array_schema_uris.size() > 0) {
1891       schema_uris->reserve(schema_uris->size() + array_schema_uris.size());
1892       std::copy(
1893           array_schema_uris.begin(),
1894           array_schema_uris.end(),
1895           std::back_inserter(*schema_uris));
1896     }
1897   }
1898 
1899   // Check if schema_uris is empty
1900   if (schema_uris->empty()) {
1901     return logger_->status(Status::StorageManagerError(
1902         "Can not get the array schemas; No array schemas found."));
1903   }
1904 
1905   return Status::Ok();
1906 }
1907 
get_latest_array_schema_uri(const URI & array_uri,URI * uri) const1908 Status StorageManager::get_latest_array_schema_uri(
1909     const URI& array_uri, URI* uri) const {
1910   auto timer_se = stats_->start_timer("read_get_latest_array_schema_uri");
1911 
1912   std::vector<URI> schema_uris;
1913   RETURN_NOT_OK(get_array_schema_uris(array_uri, &schema_uris));
1914   if (schema_uris.size() == 0) {
1915     return logger_->status(Status::StorageManagerError(
1916         "Can not get the latest array schema; No array schemas found."));
1917   }
1918   *uri = schema_uris.back();
1919   if (uri->is_invalid()) {
1920     return logger_->status(
1921         Status::StorageManagerError("Could not find array schema URI"));
1922   }
1923 
1924   return Status::Ok();
1925 }
1926 
load_array_schema_from_uri(const URI & schema_uri,const EncryptionKey & encryption_key,ArraySchema ** array_schema)1927 Status StorageManager::load_array_schema_from_uri(
1928     const URI& schema_uri,
1929     const EncryptionKey& encryption_key,
1930     ArraySchema** array_schema) {
1931   auto timer_se = stats_->start_timer("read_load_array_schema_from_uri");
1932 
1933   GenericTileIO tile_io(this, schema_uri);
1934   Tile* tile = nullptr;
1935 
1936   // Get encryption key from config
1937   if (encryption_key.encryption_type() == EncryptionType::NO_ENCRYPTION) {
1938     bool found = false;
1939     std::string encryption_key_from_cfg =
1940         config_.get("sm.encryption_key", &found);
1941     assert(found);
1942     std::string encryption_type_from_cfg =
1943         config_.get("sm.encryption_type", &found);
1944     assert(found);
1945     auto [st, etc] = encryption_type_enum(encryption_type_from_cfg);
1946     RETURN_NOT_OK(st);
1947     EncryptionType encryption_type_cfg = etc.value();
1948 
1949     EncryptionKey encryption_key_cfg;
1950     if (encryption_key_from_cfg.empty()) {
1951       RETURN_NOT_OK(
1952           encryption_key_cfg.set_key(encryption_type_cfg, nullptr, 0));
1953     } else {
1954       uint32_t key_length = 0;
1955       if (EncryptionKey::is_valid_key_length(
1956               encryption_type_cfg,
1957               static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
1958         const UnitTestConfig& unit_test_cfg = UnitTestConfig::instance();
1959         if (unit_test_cfg.array_encryption_key_length.is_set()) {
1960           key_length = unit_test_cfg.array_encryption_key_length.get();
1961         } else {
1962           key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
1963         }
1964       }
1965       RETURN_NOT_OK(encryption_key_cfg.set_key(
1966           encryption_type_cfg,
1967           (const void*)encryption_key_from_cfg.c_str(),
1968           key_length));
1969     }
1970     RETURN_NOT_OK(tile_io.read_generic(&tile, 0, encryption_key_cfg, config_));
1971   } else {
1972     RETURN_NOT_OK(tile_io.read_generic(&tile, 0, encryption_key, config_));
1973   }
1974 
1975   auto buffer = tile->buffer();
1976   Buffer buff;
1977   buff.realloc(buffer->size());
1978   buff.set_size(buffer->size());
1979   RETURN_NOT_OK_ELSE(buffer->read(buff.data(), buff.size()), tdb_delete(tile));
1980   tdb_delete(tile);
1981 
1982   stats_->add_counter("read_array_schema_size", buff.size());
1983 
1984   // Deserialize
1985   ConstBuffer cbuff(&buff);
1986   *array_schema = tdb_new(ArraySchema);
1987   Status st = (*array_schema)->deserialize(&cbuff);
1988   if (!st.ok()) {
1989     tdb_delete(*array_schema);
1990     *array_schema = nullptr;
1991     return st;
1992   }
1993   (*array_schema)->set_uri(schema_uri);
1994   return st;
1995 }
1996 
load_array_schema(const URI & array_uri,const EncryptionKey & encryption_key,ArraySchema ** array_schema)1997 Status StorageManager::load_array_schema(
1998     const URI& array_uri,
1999     const EncryptionKey& encryption_key,
2000     ArraySchema** array_schema) {
2001   auto timer_se = stats_->start_timer("read_load_array_schema");
2002 
2003   if (array_uri.is_invalid())
2004     return logger_->status(Status::StorageManagerError(
2005         "Cannot load array schema; Invalid array URI"));
2006 
2007   URI schema_uri;
2008   RETURN_NOT_OK(get_latest_array_schema_uri(array_uri, &schema_uri));
2009 
2010   RETURN_NOT_OK(
2011       load_array_schema_from_uri(schema_uri, encryption_key, array_schema));
2012   (*array_schema)->set_array_uri(array_uri);
2013   return Status::Ok();
2014 }
2015 
2016 std::tuple<
2017     Status,
2018     std::optional<std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>>>>
load_all_array_schemas(const URI & array_uri,const EncryptionKey & encryption_key)2019 StorageManager::load_all_array_schemas(
2020     const URI& array_uri, const EncryptionKey& encryption_key) {
2021   auto timer_se = stats_->start_timer("read_load_all_array_schemas");
2022 
2023   if (array_uri.is_invalid())
2024     return {logger_->status(Status::StorageManagerError(
2025                 "Cannot load all array schemas; Invalid array URI")),
2026             std::nullopt};
2027 
2028   std::vector<URI> schema_uris;
2029   RETURN_NOT_OK_TUPLE(get_array_schema_uris(array_uri, &schema_uris));
2030   if (schema_uris.empty()) {
2031     return {
2032         logger_->status(Status::StorageManagerError(
2033             "Can not get the array schema vector; No array schemas found.")),
2034         std::nullopt};
2035   }
2036 
2037   std::vector<ArraySchema*> schema_vector;
2038   auto schema_num = schema_uris.size();
2039   schema_vector.resize(schema_num);
2040 
2041   auto status =
2042       parallel_for(compute_tp_, 0, schema_num, [&](size_t schema_ith) {
2043         auto& schema_uri = schema_uris[schema_ith];
2044         auto array_schema = (ArraySchema*)nullptr;
2045         RETURN_NOT_OK(load_array_schema_from_uri(
2046             schema_uri, encryption_key, &array_schema));
2047         schema_vector[schema_ith] = array_schema;
2048         return Status::Ok();
2049       });
2050   RETURN_NOT_OK_TUPLE(status);
2051 
2052   std::unordered_map<std::string, tdb_shared_ptr<ArraySchema>> array_schemas;
2053   for (const auto& array_schema : schema_vector) {
2054     array_schemas[array_schema->name()] =
2055         tdb_shared_ptr<ArraySchema>(array_schema);
2056   }
2057 
2058   return {Status::Ok(), array_schemas};
2059 }
2060 
load_array_metadata(const URI & array_uri,const EncryptionKey & encryption_key,uint64_t timestamp_start,uint64_t timestamp_end,Metadata * metadata)2061 Status StorageManager::load_array_metadata(
2062     const URI& array_uri,
2063     const EncryptionKey& encryption_key,
2064     uint64_t timestamp_start,
2065     uint64_t timestamp_end,
2066     Metadata* metadata) {
2067   auto timer_se = stats_->start_timer("read_load_array_meta");
2068 
2069   OpenArray* open_array;
2070   // Lock mutex
2071   {
2072     std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
2073 
2074     // Find the open array entry
2075     auto it = open_arrays_for_reads_.find(array_uri.to_string());
2076     assert(it != open_arrays_for_reads_.end());
2077     open_array = it->second;
2078 
2079     // Lock the array
2080     open_array->mtx_lock();
2081   }
2082 
2083   // Determine which array metadata to load
2084   std::vector<TimestampedURI> array_metadata_to_load;
2085   std::vector<URI> array_metadata_uris;
2086   RETURN_NOT_OK_ELSE(
2087       get_array_metadata_uris(array_uri, &array_metadata_uris),
2088       open_array->mtx_unlock());
2089   RETURN_NOT_OK_ELSE(
2090       get_sorted_uris(
2091           array_metadata_uris,
2092           &array_metadata_to_load,
2093           timestamp_start,
2094           timestamp_end),
2095       open_array->mtx_unlock());
2096 
2097   // Get the array metadata
2098   RETURN_NOT_OK_ELSE(
2099       load_array_metadata(
2100           open_array, encryption_key, array_metadata_to_load, metadata),
2101       open_array->mtx_unlock());
2102 
2103   // Unlock the array
2104   open_array->mtx_unlock();
2105 
2106   return Status::Ok();
2107 }
2108 
object_type(const URI & uri,ObjectType * type) const2109 Status StorageManager::object_type(const URI& uri, ObjectType* type) const {
2110   URI dir_uri = uri;
2111   if (uri.is_s3() || uri.is_azure() || uri.is_gcs()) {
2112     // Always add a trailing '/' in the S3/Azure/GCS case so that listing the
2113     // URI as a directory will work as expected. Listing a non-directory object
2114     // is not an error for S3/Azure/GCS.
2115     auto uri_str = uri.to_string();
2116     dir_uri =
2117         URI(utils::parse::ends_with(uri_str, "/") ? uri_str : (uri_str + "/"));
2118   } else {
2119     // For non public cloud backends, listing a non-directory is an error.
2120     bool is_dir = false;
2121     RETURN_NOT_OK(vfs_->is_dir(uri, &is_dir));
2122     if (!is_dir) {
2123       *type = ObjectType::INVALID;
2124       return Status::Ok();
2125     }
2126   }
2127 
2128   std::vector<URI> child_uris;
2129   RETURN_NOT_OK(vfs_->ls(dir_uri, &child_uris));
2130 
2131   for (const auto& child_uri : child_uris) {
2132     auto uri_str = child_uri.to_string();
2133     if (utils::parse::ends_with(uri_str, constants::group_filename)) {
2134       *type = ObjectType::GROUP;
2135       return Status::Ok();
2136     } else if (utils::parse::ends_with(
2137                    uri_str, constants::array_schema_filename)) {
2138       *type = ObjectType::ARRAY;
2139       return Status::Ok();
2140     } else if (utils::parse::ends_with(
2141                    uri_str, constants::array_schema_folder_name)) {
2142       *type = ObjectType::ARRAY;
2143       return Status::Ok();
2144     }
2145   }
2146 
2147   *type = ObjectType::INVALID;
2148   return Status::Ok();
2149 }
2150 
object_iter_begin(ObjectIter ** obj_iter,const char * path,WalkOrder order)2151 Status StorageManager::object_iter_begin(
2152     ObjectIter** obj_iter, const char* path, WalkOrder order) {
2153   // Sanity check
2154   URI path_uri(path);
2155   if (path_uri.is_invalid()) {
2156     return logger_->status(Status::StorageManagerError(
2157         "Cannot create object iterator; Invalid input path"));
2158   }
2159 
2160   // Get all contents of path
2161   std::vector<URI> uris;
2162   RETURN_NOT_OK(vfs_->ls(path_uri, &uris));
2163 
2164   // Create a new object iterator
2165   *obj_iter = tdb_new(ObjectIter);
2166   (*obj_iter)->order_ = order;
2167   (*obj_iter)->recursive_ = true;
2168 
2169   // Include the uris that are TileDB objects in the iterator state
2170   ObjectType obj_type;
2171   for (auto& uri : uris) {
2172     RETURN_NOT_OK_ELSE(object_type(uri, &obj_type), tdb_delete(*obj_iter));
2173     if (obj_type != ObjectType::INVALID) {
2174       (*obj_iter)->objs_.push_back(uri);
2175       if (order == WalkOrder::POSTORDER)
2176         (*obj_iter)->expanded_.push_back(false);
2177     }
2178   }
2179 
2180   return Status::Ok();
2181 }
2182 
object_iter_begin(ObjectIter ** obj_iter,const char * path)2183 Status StorageManager::object_iter_begin(
2184     ObjectIter** obj_iter, const char* path) {
2185   // Sanity check
2186   URI path_uri(path);
2187   if (path_uri.is_invalid()) {
2188     return logger_->status(Status::StorageManagerError(
2189         "Cannot create object iterator; Invalid input path"));
2190   }
2191 
2192   // Get all contents of path
2193   std::vector<URI> uris;
2194   RETURN_NOT_OK(vfs_->ls(path_uri, &uris));
2195 
2196   // Create a new object iterator
2197   *obj_iter = tdb_new(ObjectIter);
2198   (*obj_iter)->order_ = WalkOrder::PREORDER;
2199   (*obj_iter)->recursive_ = false;
2200 
2201   // Include the uris that are TileDB objects in the iterator state
2202   ObjectType obj_type;
2203   for (auto& uri : uris) {
2204     RETURN_NOT_OK(object_type(uri, &obj_type));
2205     if (obj_type != ObjectType::INVALID)
2206       (*obj_iter)->objs_.push_back(uri);
2207   }
2208 
2209   return Status::Ok();
2210 }
2211 
object_iter_free(ObjectIter * obj_iter)2212 void StorageManager::object_iter_free(ObjectIter* obj_iter) {
2213   tdb_delete(obj_iter);
2214 }
2215 
object_iter_next(ObjectIter * obj_iter,const char ** path,ObjectType * type,bool * has_next)2216 Status StorageManager::object_iter_next(
2217     ObjectIter* obj_iter, const char** path, ObjectType* type, bool* has_next) {
2218   // Handle case there is no next
2219   if (obj_iter->objs_.empty()) {
2220     *has_next = false;
2221     return Status::Ok();
2222   }
2223 
2224   // Retrieve next object
2225   switch (obj_iter->order_) {
2226     case WalkOrder::PREORDER:
2227       RETURN_NOT_OK(object_iter_next_preorder(obj_iter, path, type, has_next));
2228       break;
2229     case WalkOrder::POSTORDER:
2230       RETURN_NOT_OK(object_iter_next_postorder(obj_iter, path, type, has_next));
2231       break;
2232   }
2233 
2234   return Status::Ok();
2235 }
2236 
object_iter_next_postorder(ObjectIter * obj_iter,const char ** path,ObjectType * type,bool * has_next)2237 Status StorageManager::object_iter_next_postorder(
2238     ObjectIter* obj_iter, const char** path, ObjectType* type, bool* has_next) {
2239   // Get all contents of the next URI recursively till the bottom,
2240   // if the front of the list has not been expanded
2241   if (obj_iter->expanded_.front() == false) {
2242     uint64_t obj_num;
2243     do {
2244       obj_num = obj_iter->objs_.size();
2245       std::vector<URI> uris;
2246       RETURN_NOT_OK(vfs_->ls(obj_iter->objs_.front(), &uris));
2247       obj_iter->expanded_.front() = true;
2248 
2249       // Push the new TileDB objects in the front of the iterator's list
2250       ObjectType obj_type;
2251       for (auto it = uris.rbegin(); it != uris.rend(); ++it) {
2252         RETURN_NOT_OK(object_type(*it, &obj_type));
2253         if (obj_type != ObjectType::INVALID) {
2254           obj_iter->objs_.push_front(*it);
2255           obj_iter->expanded_.push_front(false);
2256         }
2257       }
2258     } while (obj_num != obj_iter->objs_.size());
2259   }
2260 
2261   // Prepare the values to be returned
2262   URI front_uri = obj_iter->objs_.front();
2263   obj_iter->next_ = front_uri.to_string();
2264   RETURN_NOT_OK(object_type(front_uri, type));
2265   *path = obj_iter->next_.c_str();
2266   *has_next = true;
2267 
2268   // Pop the front (next URI) of the iterator's object list
2269   obj_iter->objs_.pop_front();
2270   obj_iter->expanded_.pop_front();
2271 
2272   return Status::Ok();
2273 }
2274 
object_iter_next_preorder(ObjectIter * obj_iter,const char ** path,ObjectType * type,bool * has_next)2275 Status StorageManager::object_iter_next_preorder(
2276     ObjectIter* obj_iter, const char** path, ObjectType* type, bool* has_next) {
2277   // Prepare the values to be returned
2278   URI front_uri = obj_iter->objs_.front();
2279   obj_iter->next_ = front_uri.to_string();
2280   RETURN_NOT_OK(object_type(front_uri, type));
2281   *path = obj_iter->next_.c_str();
2282   *has_next = true;
2283 
2284   // Pop the front (next URI) of the iterator's object list
2285   obj_iter->objs_.pop_front();
2286 
2287   // Return if no recursion is needed
2288   if (!obj_iter->recursive_)
2289     return Status::Ok();
2290 
2291   // Get all contents of the next URI
2292   std::vector<URI> uris;
2293   RETURN_NOT_OK(vfs_->ls(front_uri, &uris));
2294 
2295   // Push the new TileDB objects in the front of the iterator's list
2296   ObjectType obj_type;
2297   for (auto it = uris.rbegin(); it != uris.rend(); ++it) {
2298     RETURN_NOT_OK(object_type(*it, &obj_type));
2299     if (obj_type != ObjectType::INVALID)
2300       obj_iter->objs_.push_front(*it);
2301   }
2302 
2303   return Status::Ok();
2304 }
2305 
query_submit(Query * query)2306 Status StorageManager::query_submit(Query* query) {
2307   // Process the query
2308   QueryInProgress in_progress(this);
2309   auto st = query->process();
2310 
2311   return st;
2312 }
2313 
query_submit_async(Query * query)2314 Status StorageManager::query_submit_async(Query* query) {
2315   // Push the query into the async queue
2316   return async_push_query(query);
2317 }
2318 
read_from_cache(const URI & uri,uint64_t offset,Buffer * buffer,uint64_t nbytes,bool * in_cache) const2319 Status StorageManager::read_from_cache(
2320     const URI& uri,
2321     uint64_t offset,
2322     Buffer* buffer,
2323     uint64_t nbytes,
2324     bool* in_cache) const {
2325   std::stringstream key;
2326   key << uri.to_string() << "+" << offset;
2327   RETURN_NOT_OK(tile_cache_->read(key.str(), buffer, 0, nbytes, in_cache));
2328   buffer->set_size(nbytes);
2329   buffer->reset_offset();
2330 
2331   return Status::Ok();
2332 }
2333 
read(const URI & uri,uint64_t offset,Buffer * buffer,uint64_t nbytes) const2334 Status StorageManager::read(
2335     const URI& uri, uint64_t offset, Buffer* buffer, uint64_t nbytes) const {
2336   RETURN_NOT_OK(buffer->realloc(nbytes));
2337   RETURN_NOT_OK(vfs_->read(uri, offset, buffer->data(), nbytes));
2338   buffer->set_size(nbytes);
2339   buffer->reset_offset();
2340 
2341   return Status::Ok();
2342 }
2343 
read(const URI & uri,uint64_t offset,void * buffer,uint64_t nbytes) const2344 Status StorageManager::read(
2345     const URI& uri, uint64_t offset, void* buffer, uint64_t nbytes) const {
2346   RETURN_NOT_OK(vfs_->read(uri, offset, buffer, nbytes));
2347   return Status::Ok();
2348 }
2349 
set_tag(const std::string & key,const std::string & value)2350 Status StorageManager::set_tag(
2351     const std::string& key, const std::string& value) {
2352   tags_[key] = value;
2353 
2354   // Tags are added to REST requests as HTTP headers.
2355   if (rest_client_ != nullptr)
2356     RETURN_NOT_OK(rest_client_->set_header(key, value));
2357 
2358   return Status::Ok();
2359 }
2360 
store_array_schema(ArraySchema * array_schema,const EncryptionKey & encryption_key)2361 Status StorageManager::store_array_schema(
2362     ArraySchema* array_schema, const EncryptionKey& encryption_key) {
2363   const URI schema_uri = array_schema->uri();
2364 
2365   // Serialize
2366   Buffer buff;
2367   RETURN_NOT_OK(array_schema->serialize(&buff));
2368 
2369   stats_->add_counter("write_array_schema_size", buff.size());
2370 
2371   // Delete file if it exists already
2372   bool exists;
2373   RETURN_NOT_OK(is_file(schema_uri, &exists));
2374   if (exists)
2375     RETURN_NOT_OK(vfs_->remove_file(schema_uri));
2376 
2377   // Check if the array schema directory exists
2378   // If not create it, this is caused by a pre-v10 array
2379   bool schema_dir_exists = false;
2380   URI array_schema_folder_uri =
2381       array_schema->array_uri().join_path(constants::array_schema_folder_name);
2382   RETURN_NOT_OK(is_dir(array_schema_folder_uri, &schema_dir_exists));
2383   if (!schema_dir_exists)
2384     RETURN_NOT_OK(create_dir(array_schema_folder_uri));
2385 
2386   // Write to file
2387   Tile tile(
2388       constants::generic_tile_datatype,
2389       constants::generic_tile_cell_size,
2390       0,
2391       &buff,
2392       false);
2393 
2394   GenericTileIO tile_io(this, schema_uri);
2395   uint64_t nbytes;
2396   Status st = tile_io.write_generic(&tile, encryption_key, &nbytes);
2397   (void)nbytes;
2398   if (st.ok())
2399     st = close_file(schema_uri);
2400 
2401   buff.clear();
2402 
2403   return st;
2404 }
2405 
store_array_metadata(const URI & array_uri,const EncryptionKey & encryption_key,Metadata * array_metadata)2406 Status StorageManager::store_array_metadata(
2407     const URI& array_uri,
2408     const EncryptionKey& encryption_key,
2409     Metadata* array_metadata) {
2410   auto timer_se = stats_->start_timer("write_array_meta");
2411 
2412   // Trivial case
2413   if (array_metadata == nullptr)
2414     return Status::Ok();
2415 
2416   // Serialize array metadata
2417   Buffer metadata_buff;
2418   RETURN_NOT_OK(array_metadata->serialize(&metadata_buff));
2419 
2420   // Do nothing if there are no metadata to write
2421   if (metadata_buff.size() == 0)
2422     return Status::Ok();
2423 
2424   stats_->add_counter("write_array_meta_size", metadata_buff.size());
2425 
2426   // Create a metadata file name
2427   URI array_metadata_uri;
2428   RETURN_NOT_OK(array_metadata->get_uri(array_uri, &array_metadata_uri));
2429 
2430   Tile tile(
2431       constants::generic_tile_datatype,
2432       constants::generic_tile_cell_size,
2433       0,
2434       &metadata_buff,
2435       false);
2436 
2437   GenericTileIO tile_io(this, array_metadata_uri);
2438   uint64_t nbytes;
2439   Status st = tile_io.write_generic(&tile, encryption_key, &nbytes);
2440   (void)nbytes;
2441 
2442   if (st.ok()) {
2443     st = close_file(array_metadata_uri);
2444   }
2445 
2446   metadata_buff.clear();
2447 
2448   return st;
2449 }
2450 
close_file(const URI & uri)2451 Status StorageManager::close_file(const URI& uri) {
2452   return vfs_->close_file(uri);
2453 }
2454 
sync(const URI & uri)2455 Status StorageManager::sync(const URI& uri) {
2456   return vfs_->sync(uri);
2457 }
2458 
vfs() const2459 VFS* StorageManager::vfs() const {
2460   return vfs_;
2461 }
2462 
wait_for_zero_in_progress()2463 void StorageManager::wait_for_zero_in_progress() {
2464   std::unique_lock<std::mutex> lck(queries_in_progress_mtx_);
2465   queries_in_progress_cv_.wait(
2466       lck, [this]() { return queries_in_progress_ == 0; });
2467 }
2468 
init_rest_client()2469 Status StorageManager::init_rest_client() {
2470   const char* server_address;
2471   RETURN_NOT_OK(config_.get("rest.server_address", &server_address));
2472   if (server_address != nullptr) {
2473     rest_client_.reset(tdb_new(RestClient));
2474     RETURN_NOT_OK(rest_client_->init(stats_, &config_, compute_tp_));
2475   }
2476 
2477   return Status::Ok();
2478 }
2479 
write_to_cache(const URI & uri,uint64_t offset,Buffer * buffer) const2480 Status StorageManager::write_to_cache(
2481     const URI& uri, uint64_t offset, Buffer* buffer) const {
2482   // Do not write metadata or array schema to cache
2483   std::string filename = uri.last_path_part();
2484   std::string uri_str = uri.to_string();
2485   if (filename == constants::fragment_metadata_filename ||
2486       (uri_str.find(constants::array_schema_folder_name) !=
2487        std::string::npos) ||
2488       filename == constants::array_schema_filename) {
2489     return Status::Ok();
2490   }
2491 
2492   // Generate key (uri + offset)
2493   std::stringstream key;
2494   key << uri.to_string() << "+" << offset;
2495 
2496   // Insert to cache
2497   Buffer cached_buffer;
2498   RETURN_NOT_OK(cached_buffer.write(buffer->data(), buffer->size()));
2499   RETURN_NOT_OK(
2500       tile_cache_->insert(key.str(), std::move(cached_buffer), false));
2501 
2502   return Status::Ok();
2503 }
2504 
write(const URI & uri,Buffer * buffer) const2505 Status StorageManager::write(const URI& uri, Buffer* buffer) const {
2506   return vfs_->write(uri, buffer->data(), buffer->size());
2507 }
2508 
write(const URI & uri,void * data,uint64_t size) const2509 Status StorageManager::write(const URI& uri, void* data, uint64_t size) const {
2510   return vfs_->write(uri, data, size);
2511 }
2512 
stats()2513 stats::Stats* StorageManager::stats() {
2514   return stats_;
2515 }
2516 
logger() const2517 tdb_shared_ptr<Logger> StorageManager::logger() const {
2518   return logger_;
2519 }
2520 
2521 /* ****************************** */
2522 /*         PRIVATE METHODS        */
2523 /* ****************************** */
2524 
array_open_without_fragments(const URI & array_uri,const EncryptionKey & encryption_key,OpenArray ** open_array)2525 Status StorageManager::array_open_without_fragments(
2526     const URI& array_uri,
2527     const EncryptionKey& encryption_key,
2528     OpenArray** open_array) {
2529   auto timer_se = stats_->start_timer("read_array_open_without_fragments");
2530   if (!vfs_->supports_uri_scheme(array_uri))
2531     return logger_->status(
2532         Status::StorageManagerError("Cannot open array; "
2533                                     "URI scheme "
2534                                     "unsupported."));
2535 
2536   // Lock mutexes
2537   {
2538     std::lock_guard<std::mutex> lock{open_array_for_reads_mtx_};
2539     std::lock_guard<std::mutex> xlock{xlock_mtx_};
2540 
2541     // Find the open array entry and check encryption key
2542     auto it = open_arrays_for_reads_.find(array_uri.to_string());
2543     if (it != open_arrays_for_reads_.end()) {
2544       RETURN_NOT_OK(it->second->set_encryption_key(encryption_key));
2545       *open_array = it->second;
2546     } else {  // Create a new entry
2547       *open_array = tdb_new(OpenArray, array_uri, QueryType::READ);
2548       RETURN_NOT_OK_ELSE(
2549           (*open_array)->set_encryption_key(encryption_key),
2550           tdb_delete(*open_array));
2551       open_arrays_for_reads_[array_uri.to_string()] = *open_array;
2552     }
2553     // Lock the array and increment counter
2554     (*open_array)->mtx_lock();
2555     (*open_array)->cnt_incr();
2556   }
2557 
2558   // Acquire a shared filelock
2559   auto st = (*open_array)->file_lock(vfs_);
2560   if (!st.ok()) {
2561     (*open_array)->mtx_unlock();
2562     array_close_for_reads(array_uri);
2563     return st;
2564   }
2565 
2566   // When opening the array we want to always reload the schema
2567   // Fragments are always relisted, now with schema evolution we
2568   // need to make sure we list out any new schemas too.
2569   // Fragment listing is significantly slower than listing schemas
2570   // and they happen in parallel so this is low impact
2571   st = load_array_schema(array_uri, *open_array, encryption_key);
2572   if (!st.ok()) {
2573     (*open_array)->mtx_unlock();
2574     array_close_for_reads(array_uri);
2575     return st;
2576   }
2577 
2578   return Status::Ok();
2579 }
2580 
get_array_metadata_uris(const URI & array_uri,std::vector<URI> * array_metadata_uris) const2581 Status StorageManager::get_array_metadata_uris(
2582     const URI& array_uri, std::vector<URI>* array_metadata_uris) const {
2583   // Get all uris in the array metadata directory
2584   URI metadata_dir = array_uri.join_path(constants::array_metadata_folder_name);
2585   std::vector<URI> uris;
2586 
2587   bool is_dir;
2588   RETURN_NOT_OK(vfs_->is_dir(metadata_dir, &is_dir));
2589   if (!is_dir)
2590     return Status::Ok();
2591 
2592   RETURN_NOT_OK(vfs_->ls(metadata_dir.add_trailing_slash(), &uris));
2593 
2594   // Get only the metadata uris
2595   for (auto& uri : uris) {
2596     auto uri_last_path = uri.last_path_part();
2597     if (utils::parse::starts_with(uri_last_path, "."))
2598       continue;
2599 
2600     if (utils::parse::starts_with(uri_last_path, "__") &&
2601         !utils::parse::ends_with(uri_last_path, constants::vacuum_file_suffix))
2602       array_metadata_uris->push_back(uri);
2603   }
2604 
2605   return Status::Ok();
2606 }
2607 
load_array_schema(const URI & array_uri,OpenArray * open_array,const EncryptionKey & encryption_key)2608 Status StorageManager::load_array_schema(
2609     const URI& array_uri,
2610     OpenArray* open_array,
2611     const EncryptionKey& encryption_key) {
2612   // Do nothing if the array schema is already loaded
2613   if (open_array->array_schema_latest() != nullptr)
2614     return Status::Ok();
2615 
2616   auto array_schema = (ArraySchema*)nullptr;
2617   RETURN_NOT_OK(load_array_schema(array_uri, encryption_key, &array_schema));
2618   open_array->set_array_schema(array_schema);
2619 
2620   auto&& [st_schemas, schemas] =
2621       load_all_array_schemas(array_uri, encryption_key);
2622 
2623   if (!st_schemas.ok())
2624     return st_schemas;
2625 
2626   open_array->set_array_schemas_all(schemas.value());
2627 
2628   return Status::Ok();
2629 }
2630 
load_array_metadata(OpenArray * open_array,const EncryptionKey & encryption_key,const std::vector<TimestampedURI> & array_metadata_to_load,Metadata * metadata)2631 Status StorageManager::load_array_metadata(
2632     OpenArray* open_array,
2633     const EncryptionKey& encryption_key,
2634     const std::vector<TimestampedURI>& array_metadata_to_load,
2635     Metadata* metadata) {
2636   // Special case
2637   if (metadata == nullptr)
2638     return Status::Ok();
2639 
2640   auto metadata_num = array_metadata_to_load.size();
2641   std::vector<tdb_shared_ptr<Buffer>> metadata_buffs;
2642   metadata_buffs.resize(metadata_num);
2643   auto status = parallel_for(compute_tp_, 0, metadata_num, [&](size_t m) {
2644     const auto& uri = array_metadata_to_load[m].uri_;
2645     auto metadata_buff = open_array->array_metadata(uri);
2646     if (metadata_buff == nullptr) {  // Array metadata does not exist - load it
2647       GenericTileIO tile_io(this, uri);
2648       auto tile = (Tile*)nullptr;
2649       RETURN_NOT_OK(tile_io.read_generic(&tile, 0, encryption_key, config_));
2650 
2651       auto buffer = tile->buffer();
2652       metadata_buff = tdb_make_shared(Buffer);
2653       RETURN_NOT_OK(metadata_buff->realloc(buffer->size()));
2654       metadata_buff->set_size(buffer->size());
2655       buffer->reset_offset();
2656       RETURN_NOT_OK_ELSE(
2657           buffer->read(metadata_buff->data(), metadata_buff->size()),
2658           tdb_delete(tile));
2659       tdb_delete(tile);
2660 
2661       open_array->insert_array_metadata(uri, metadata_buff);
2662     }
2663     metadata_buffs[m] = metadata_buff;
2664     return Status::Ok();
2665   });
2666   RETURN_NOT_OK(status);
2667 
2668   // Compute array metadata size for the statistics
2669   uint64_t meta_size = 0;
2670   for (const auto& b : metadata_buffs)
2671     meta_size += b->size();
2672   stats_->add_counter("read_array_meta_size", meta_size);
2673 
2674   // Deserialize metadata buffers
2675   metadata->deserialize(metadata_buffs);
2676 
2677   // Sets the loaded metadata URIs
2678   metadata->set_loaded_metadata_uris(array_metadata_to_load);
2679 
2680   return Status::Ok();
2681 }
2682 
load_fragment_metadata(OpenArray * open_array,const EncryptionKey & encryption_key,const std::vector<TimestampedURI> & fragments_to_load,Buffer * meta_buff,const std::unordered_map<std::string,uint64_t> & offsets,std::vector<tdb_shared_ptr<FragmentMetadata>> * fragment_metadata)2683 Status StorageManager::load_fragment_metadata(
2684     OpenArray* open_array,
2685     const EncryptionKey& encryption_key,
2686     const std::vector<TimestampedURI>& fragments_to_load,
2687     Buffer* meta_buff,
2688     const std::unordered_map<std::string, uint64_t>& offsets,
2689     std::vector<tdb_shared_ptr<FragmentMetadata>>* fragment_metadata) {
2690   auto timer_se = stats_->start_timer("read_load_frag_meta");
2691 
2692   // Load the metadata for each fragment, only if they are not already
2693   // loaded
2694   auto fragment_num = fragments_to_load.size();
2695   fragment_metadata->resize(fragment_num);
2696   auto status = parallel_for(compute_tp_, 0, fragment_num, [&](size_t f) {
2697     const auto& sf = fragments_to_load[f];
2698     auto array_schema = open_array->array_schema_latest();
2699 
2700     auto metadata = open_array->fragment_metadata(sf.uri_);
2701     if (metadata == nullptr) {  // Fragment metadata does not exist - load it
2702       URI coords_uri =
2703           sf.uri_.join_path(constants::coords + constants::file_suffix);
2704 
2705       auto name = sf.uri_.remove_trailing_slash().last_path_part();
2706       uint32_t f_version;
2707       RETURN_NOT_OK(utils::parse::get_fragment_name_version(name, &f_version));
2708 
2709       // Note that the fragment metadata version is >= the array schema
2710       // version. Therefore, the check below is defensive and will always
2711       // ensure backwards compatibility.
2712       if (f_version == 1) {  // This is equivalent to format version <=2
2713         bool sparse;
2714         RETURN_NOT_OK(vfs_->is_file(coords_uri, &sparse));
2715         metadata = tdb_make_shared(
2716             FragmentMetadata,
2717             this,
2718             array_schema,
2719             sf.uri_,
2720             sf.timestamp_range_,
2721             !sparse);
2722       } else {  // Format version > 2
2723         metadata = tdb_make_shared(
2724             FragmentMetadata, this, array_schema, sf.uri_, sf.timestamp_range_);
2725       }
2726 
2727       // Potentially find the basic fragment metadata in the consolidated
2728       // metadata buffer
2729       Buffer* f_buff = nullptr;
2730       uint64_t offset = 0;
2731 
2732       auto it = offsets.end();
2733       if (metadata->format_version() >= 9) {
2734         it = offsets.find(name);
2735       } else {
2736         it = offsets.find(sf.uri_.to_string());
2737       }
2738       if (it != offsets.end()) {
2739         f_buff = meta_buff;
2740         offset = it->second;
2741       }
2742 
2743       // Load fragment metadata
2744       RETURN_NOT_OK(metadata->load(
2745           encryption_key, f_buff, offset, open_array->array_schemas_all()));
2746       open_array->insert_fragment_metadata(metadata);
2747     }
2748 
2749     (*fragment_metadata)[f] = metadata;
2750     return Status::Ok();
2751   });
2752   RETURN_NOT_OK(status);
2753 
2754   return Status::Ok();
2755 }
2756 
load_consolidated_fragment_meta(const URI & uri,const EncryptionKey & enc_key,Buffer * f_buff,std::unordered_map<std::string,uint64_t> * offsets)2757 Status StorageManager::load_consolidated_fragment_meta(
2758     const URI& uri,
2759     const EncryptionKey& enc_key,
2760     Buffer* f_buff,
2761     std::unordered_map<std::string, uint64_t>* offsets) {
2762   auto timer_se = stats_->start_timer("read_load_consolidated_frag_meta");
2763 
2764   // No consolidated fragment metadata file
2765   if (uri.to_string().empty())
2766     return Status::Ok();
2767 
2768   GenericTileIO tile_io(this, uri);
2769   Tile* tile = nullptr;
2770   RETURN_NOT_OK(tile_io.read_generic(&tile, 0, enc_key, config_));
2771 
2772   auto buffer = tile->buffer();
2773   f_buff->realloc(buffer->size());
2774   f_buff->set_size(buffer->size());
2775   buffer->reset_offset();
2776   RETURN_NOT_OK_ELSE(
2777       buffer->read(f_buff->data(), f_buff->size()), tdb_delete(tile));
2778   tdb_delete(tile);
2779 
2780   stats_->add_counter("consolidated_frag_meta_size", f_buff->size());
2781 
2782   uint32_t fragment_num;
2783   f_buff->reset_offset();
2784   f_buff->read(&fragment_num, sizeof(uint32_t));
2785 
2786   uint64_t name_size, offset;
2787   std::string name;
2788   for (uint32_t f = 0; f < fragment_num; ++f) {
2789     f_buff->read(&name_size, sizeof(uint64_t));
2790     name.resize(name_size);
2791     f_buff->read(&name[0], name_size);
2792     f_buff->read(&offset, sizeof(uint64_t));
2793     (*offsets)[name] = offset;
2794   }
2795 
2796   return Status::Ok();
2797 }
2798 
get_consolidated_fragment_meta_uri(const std::vector<URI> & uris,URI * meta_uri) const2799 Status StorageManager::get_consolidated_fragment_meta_uri(
2800     const std::vector<URI>& uris, URI* meta_uri) const {
2801   uint64_t t_latest = 0;
2802   std::pair<uint64_t, uint64_t> timestamp_range;
2803   for (const auto& uri : uris) {
2804     if (utils::parse::ends_with(uri.to_string(), constants::meta_file_suffix)) {
2805       RETURN_NOT_OK(utils::parse::get_timestamp_range(uri, &timestamp_range));
2806       if (timestamp_range.second > t_latest) {
2807         t_latest = timestamp_range.second;
2808         *meta_uri = uri;
2809       }
2810     }
2811   }
2812 
2813   return Status::Ok();
2814 }
2815 
get_sorted_uris(const std::vector<URI> & uris,std::vector<TimestampedURI> * sorted_uris,uint64_t timestamp_start,uint64_t timestamp_end) const2816 Status StorageManager::get_sorted_uris(
2817     const std::vector<URI>& uris,
2818     std::vector<TimestampedURI>* sorted_uris,
2819     uint64_t timestamp_start,
2820     uint64_t timestamp_end) const {
2821   // Do nothing if there are not enough URIs
2822   if (uris.empty())
2823     return Status::Ok();
2824 
2825   // Get the URIs that must be ignored
2826   std::vector<URI> vac_uris, to_ignore;
2827   RETURN_NOT_OK(get_uris_to_vacuum(
2828       uris, timestamp_start, timestamp_end, &to_ignore, &vac_uris, false));
2829   std::set<URI> to_ignore_set;
2830   for (const auto& uri : to_ignore)
2831     to_ignore_set.emplace(uri);
2832 
2833   // Filter based on vacuumed URIs and timestamp
2834   for (auto& uri : uris) {
2835     // Ignore vacuumed URIs
2836     if (to_ignore_set.find(uri) != to_ignore_set.end())
2837       continue;
2838 
2839     // Also ignore any vac uris
2840     if (this->is_vacuum_file(uri))
2841       continue;
2842 
2843     // Add only URIs whose first timestamp is greater than or equal to the
2844     // timestamp_start and whose second timestamp is smaller than or equal to
2845     // the timestamp_end
2846     std::pair<uint64_t, uint64_t> timestamp_range;
2847     RETURN_NOT_OK(utils::parse::get_timestamp_range(uri, &timestamp_range));
2848     auto t1 = timestamp_range.first;
2849     auto t2 = timestamp_range.second;
2850     if (t1 >= timestamp_start && t2 <= timestamp_end)
2851       sorted_uris->emplace_back(uri, timestamp_range);
2852   }
2853 
2854   // Sort the names based on the timestamps
2855   std::sort(sorted_uris->begin(), sorted_uris->end());
2856 
2857   return Status::Ok();
2858 }
2859 
get_uris_to_vacuum(const std::vector<URI> & uris,uint64_t timestamp_start,uint64_t timestamp_end,std::vector<URI> * to_vacuum,std::vector<URI> * vac_uris,bool allow_partial) const2860 Status StorageManager::get_uris_to_vacuum(
2861     const std::vector<URI>& uris,
2862     uint64_t timestamp_start,
2863     uint64_t timestamp_end,
2864     std::vector<URI>* to_vacuum,
2865     std::vector<URI>* vac_uris,
2866     bool allow_partial) const {
2867   // Get vacuum URIs
2868   std::vector<URI> vac_files;
2869   std::unordered_set<std::string> non_vac_uris_set;
2870   std::unordered_map<std::string, size_t> uris_map;
2871   for (size_t i = 0; i < uris.size(); ++i) {
2872     std::pair<uint64_t, uint64_t> timestamp_range;
2873     RETURN_NOT_OK(utils::parse::get_timestamp_range(uris[i], &timestamp_range));
2874 
2875     if (this->is_vacuum_file(uris[i])) {
2876       if (allow_partial) {
2877         if (timestamp_range.first <= timestamp_end &&
2878             timestamp_range.second >= timestamp_start)
2879           vac_files.emplace_back(uris[i]);
2880       } else {
2881         if (timestamp_range.first >= timestamp_start &&
2882             timestamp_range.second <= timestamp_end)
2883           vac_files.emplace_back(uris[i]);
2884       }
2885     } else {
2886       if (timestamp_range.first < timestamp_start ||
2887           timestamp_range.second > timestamp_end) {
2888         non_vac_uris_set.emplace(uris[i].to_string());
2889       } else {
2890         uris_map[uris[i].to_string()] = i;
2891       }
2892     }
2893   }
2894 
2895   // Compute fragment URIs to vacuum as a bitmap vector
2896   // Also determine which vac files to vacuum
2897   std::vector<int32_t> to_vacuum_vec(uris.size(), 0);
2898   std::vector<int32_t> to_vacuum_vac_files_vec(vac_files.size(), 0);
2899   auto status =
2900       parallel_for(compute_tp_, 0, vac_files.size(), [&, this](size_t i) {
2901         uint64_t size = 0;
2902         RETURN_NOT_OK(vfs_->file_size(vac_files[i], &size));
2903         std::string names;
2904         names.resize(size);
2905         RETURN_NOT_OK(vfs_->read(vac_files[i], 0, &names[0], size));
2906         std::stringstream ss(names);
2907         bool vacuum_vac_file = true;
2908         for (std::string uri_str; std::getline(ss, uri_str);) {
2909           auto it = uris_map.find(uri_str);
2910           if (it != uris_map.end())
2911             to_vacuum_vec[it->second] = 1;
2912 
2913           if (vacuum_vac_file &&
2914               non_vac_uris_set.find(uri_str) != non_vac_uris_set.end()) {
2915             vacuum_vac_file = false;
2916           }
2917         }
2918 
2919         to_vacuum_vac_files_vec[i] = vacuum_vac_file;
2920 
2921         return Status::Ok();
2922       });
2923   RETURN_NOT_OK(status);
2924 
2925   // Compute the URIs to vacuum
2926   to_vacuum->clear();
2927   for (size_t i = 0; i < uris.size(); ++i) {
2928     if (to_vacuum_vec[i] == 1)
2929       to_vacuum->emplace_back(uris[i]);
2930   }
2931 
2932   // Compute the vac URIs to vacuum
2933   vac_uris->clear();
2934   for (size_t i = 0; i < vac_files.size(); ++i) {
2935     if (to_vacuum_vac_files_vec[i] == 1)
2936       vac_uris->emplace_back(vac_files[i]);
2937   }
2938 
2939   return Status::Ok();
2940 }
2941 
set_default_tags()2942 Status StorageManager::set_default_tags() {
2943   const auto version = std::to_string(constants::library_version[0]) + "." +
2944                        std::to_string(constants::library_version[1]) + "." +
2945                        std::to_string(constants::library_version[2]);
2946 
2947   RETURN_NOT_OK(set_tag("x-tiledb-version", version));
2948   RETURN_NOT_OK(set_tag("x-tiledb-api-language", "c"));
2949 
2950   return Status::Ok();
2951 }
2952 
2953 }  // namespace sm
2954 }  // namespace tiledb
2955