1 /**
2  * @file   consolidator.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file implements the Consolidator class.
31  */
32 
33 #include "tiledb/sm/storage_manager/consolidator.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array_schema/array_schema.h"
36 #include "tiledb/sm/enums/datatype.h"
37 #include "tiledb/sm/enums/query_status.h"
38 #include "tiledb/sm/enums/query_type.h"
39 #include "tiledb/sm/filesystem/vfs.h"
40 #include "tiledb/sm/fragment/single_fragment_info.h"
41 #include "tiledb/sm/misc/parallel_functions.h"
42 #include "tiledb/sm/misc/utils.h"
43 #include "tiledb/sm/misc/uuid.h"
44 #include "tiledb/sm/query/query.h"
45 #include "tiledb/sm/stats/global_stats.h"
46 #include "tiledb/sm/storage_manager/storage_manager.h"
47 #include "tiledb/sm/tile/generic_tile_io.h"
48 #include "tiledb/sm/tile/tile.h"
49 
50 #include <iostream>
51 #include <sstream>
52 
53 using namespace tiledb::common;
54 
55 namespace tiledb {
56 namespace sm {
57 
58 /* ****************************** */
59 /*   CONSTRUCTORS & DESTRUCTORS   */
60 /* ****************************** */
61 
Consolidator(StorageManager * storage_manager)62 Consolidator::Consolidator(StorageManager* storage_manager)
63     : storage_manager_(storage_manager)
64     , stats_(storage_manager_->stats()->create_child("Consolidator"))
65     , logger_(storage_manager_->logger()->clone("Consolidator", ++logger_id_)) {
66 }
67 
68 Consolidator::~Consolidator() = default;
69 
70 /* ****************************** */
71 /*               API              */
72 /* ****************************** */
73 
consolidate(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length,const Config * config)74 Status Consolidator::consolidate(
75     const char* array_name,
76     EncryptionType encryption_type,
77     const void* encryption_key,
78     uint32_t key_length,
79     const Config* config) {
80   // Set config parameters
81   RETURN_NOT_OK(set_config(config));
82 
83   // Consolidate based on mode
84   URI array_uri = URI(array_name);
85   if (config_.mode_ == "fragment_meta")
86     return consolidate_fragment_meta(
87         array_uri, encryption_type, encryption_key, key_length);
88   else if (config_.mode_ == "fragments")
89     return consolidate_fragments(
90         array_name, encryption_type, encryption_key, key_length);
91   else if (config_.mode_ == "array_meta")
92     return consolidate_array_meta(
93         array_name, encryption_type, encryption_key, key_length);
94 
95   return logger_->status(Status::ConsolidatorError(
96       "Cannot consolidate; Invalid consolidation mode"));
97 }
98 
consolidate_array_meta(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length)99 Status Consolidator::consolidate_array_meta(
100     const char* array_name,
101     EncryptionType encryption_type,
102     const void* encryption_key,
103     uint32_t key_length) {
104   auto timer_se = stats_->start_timer("consolidate_array_meta");
105 
106   // Open array for reading
107   auto array_uri = URI(array_name);
108   Array array_for_reads(array_uri, storage_manager_);
109   RETURN_NOT_OK(array_for_reads.open(
110       QueryType::READ,
111       config_.timestamp_start_,
112       config_.timestamp_end_,
113       encryption_type,
114       encryption_key,
115       key_length));
116 
117   // Open array for writing
118   Array array_for_writes(array_uri, storage_manager_);
119   RETURN_NOT_OK_ELSE(
120       array_for_writes.open(
121           QueryType::WRITE, encryption_type, encryption_key, key_length),
122       array_for_reads.close());
123 
124   // Swap the in-memory metadata between the two arrays.
125   // After that, the array for writes will store the (consolidated by
126   // the way metadata loading works) metadata of the array for reads
127   Metadata* metadata_r;
128   auto st = array_for_reads.metadata(&metadata_r);
129   if (!st.ok()) {
130     array_for_reads.close();
131     array_for_writes.close();
132     return st;
133   }
134   Metadata* metadata_w;
135   st = array_for_writes.metadata(&metadata_w);
136   if (!st.ok()) {
137     array_for_reads.close();
138     array_for_writes.close();
139     return st;
140   }
141   metadata_r->swap(metadata_w);
142 
143   // Metadata uris to delete
144   const auto to_vacuum = metadata_w->loaded_metadata_uris();
145 
146   // Generate new name for consolidated metadata
147   st = metadata_w->generate_uri(array_uri);
148   if (!st.ok()) {
149     array_for_reads.close();
150     array_for_writes.close();
151     return st;
152   }
153 
154   // Get the new URI name
155   URI new_uri;
156   st = metadata_w->get_uri(array_uri, &new_uri);
157   if (!st.ok()) {
158     array_for_reads.close();
159     array_for_writes.close();
160     return st;
161   }
162 
163   // Close arrays
164   RETURN_NOT_OK_ELSE(array_for_reads.close(), array_for_writes.close());
165   RETURN_NOT_OK(array_for_writes.close());
166 
167   // Write vacuum file
168   URI vac_uri = URI(new_uri.to_string() + constants::vacuum_file_suffix);
169 
170   std::stringstream ss;
171   for (const auto& uri : to_vacuum)
172     ss << uri.to_string() << "\n";
173 
174   auto data = ss.str();
175   RETURN_NOT_OK(
176       storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
177   RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
178 
179   return Status::Ok();
180 }
181 
182 /* ****************************** */
183 /*        PRIVATE METHODS         */
184 /* ****************************** */
185 
consolidate_fragments(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length)186 Status Consolidator::consolidate_fragments(
187     const char* array_name,
188     EncryptionType encryption_type,
189     const void* encryption_key,
190     uint32_t key_length) {
191   auto timer_se = stats_->start_timer("consolidate_frags");
192 
193   // Open array for reading
194   Array array_for_reads(URI(array_name), storage_manager_);
195   RETURN_NOT_OK(array_for_reads.open_without_fragments(
196       encryption_type, encryption_key, key_length));
197 
198   // Open array for writing
199   Array array_for_writes(array_for_reads.array_uri(), storage_manager_);
200   RETURN_NOT_OK(array_for_writes.open(
201       QueryType::WRITE, encryption_type, encryption_key, key_length));
202 
203   // Get fragment info
204   FragmentInfo fragment_info;
205   auto st = storage_manager_->get_fragment_info(
206       array_for_reads,
207       config_.timestamp_start_,
208       config_.timestamp_end_,
209       &fragment_info);
210   if (!st.ok()) {
211     array_for_reads.close();
212     array_for_writes.close();
213     return st;
214   }
215 
216   uint32_t step = 0;
217   std::vector<TimestampedURI> to_consolidate;
218   do {
219     // No need to consolidate if no more than 1 fragment exist
220     if (fragment_info.fragment_num() <= 1)
221       break;
222 
223     // Find the next fragments to be consolidated
224     NDRange union_non_empty_domains;
225     st = compute_next_to_consolidate(
226         array_for_reads.array_schema_latest(),
227         fragment_info,
228         &to_consolidate,
229         &union_non_empty_domains);
230     if (!st.ok()) {
231       array_for_reads.close();
232       array_for_writes.close();
233       return st;
234     }
235 
236     // Check if there is anything to consolidate
237     if (to_consolidate.size() <= 1)
238       break;
239 
240     // Consolidate the selected fragments
241     URI new_fragment_uri;
242     st = consolidate(
243         array_for_reads,
244         array_for_writes,
245         to_consolidate,
246         union_non_empty_domains,
247         &new_fragment_uri);
248     if (!st.ok()) {
249       array_for_reads.close();
250       array_for_writes.close();
251       return st;
252     }
253 
254     // Get fragment info of the consolidated fragment
255     SingleFragmentInfo new_fragment_info;
256     st = storage_manager_->get_fragment_info(
257         array_for_reads, new_fragment_uri, &new_fragment_info);
258     if (!st.ok()) {
259       array_for_reads.close();
260       array_for_writes.close();
261       return st;
262     }
263 
264     // Update fragment info
265     update_fragment_info(to_consolidate, new_fragment_info, &fragment_info);
266 
267     // Advance number of steps
268     ++step;
269 
270   } while (step < config_.steps_);
271 
272   RETURN_NOT_OK_ELSE(array_for_reads.close(), array_for_writes.close());
273   RETURN_NOT_OK(array_for_writes.close());
274 
275   stats_->add_counter("consolidate_step_num", step);
276 
277   return Status::Ok();
278 }
279 
are_consolidatable(const Domain * domain,const FragmentInfo & fragment_info,size_t start,size_t end,const NDRange & union_non_empty_domains) const280 bool Consolidator::are_consolidatable(
281     const Domain* domain,
282     const FragmentInfo& fragment_info,
283     size_t start,
284     size_t end,
285     const NDRange& union_non_empty_domains) const {
286   auto anterior_ndrange = fragment_info.anterior_ndrange();
287   if (anterior_ndrange.size() != 0 &&
288       domain->overlap(union_non_empty_domains, anterior_ndrange))
289     return false;
290 
291   // Check overlap of union with earlier fragments
292   const auto& fragments = fragment_info.fragments();
293   for (size_t i = 0; i < start; ++i) {
294     if (domain->overlap(
295             union_non_empty_domains, fragments[i].non_empty_domain()))
296       return false;
297   }
298 
299   // Check consolidation amplification factor
300   auto union_cell_num = domain->cell_num(union_non_empty_domains);
301   uint64_t sum_cell_num = 0;
302   for (size_t i = start; i <= end; ++i) {
303     sum_cell_num += domain->cell_num(fragments[i].expanded_non_empty_domain());
304   }
305 
306   return (double(union_cell_num) / sum_cell_num) <= config_.amplification_;
307 }
308 
consolidate(Array & array_for_reads,Array & array_for_writes,const std::vector<TimestampedURI> & to_consolidate,const NDRange & union_non_empty_domains,URI * new_fragment_uri)309 Status Consolidator::consolidate(
310     Array& array_for_reads,
311     Array& array_for_writes,
312     const std::vector<TimestampedURI>& to_consolidate,
313     const NDRange& union_non_empty_domains,
314     URI* new_fragment_uri) {
315   auto timer_se = stats_->start_timer("consolidate_main");
316 
317   RETURN_NOT_OK(array_for_reads.load_fragments(to_consolidate));
318 
319   if (array_for_reads.is_empty()) {
320     return Status::Ok();
321   }
322 
323   // Get schema
324   auto array_schema = array_for_reads.array_schema_latest();
325 
326   // Prepare buffers
327   std::vector<ByteVec> buffers;
328   std::vector<uint64_t> buffer_sizes;
329   RETURN_NOT_OK(create_buffers(array_schema, &buffers, &buffer_sizes));
330 
331   // Create queries
332   auto query_r = (Query*)nullptr;
333   auto query_w = (Query*)nullptr;
334   auto st = create_queries(
335       &array_for_reads,
336       &array_for_writes,
337       union_non_empty_domains,
338       &query_r,
339       &query_w,
340       new_fragment_uri);
341   if (!st.ok()) {
342     tdb_delete(query_r);
343     tdb_delete(query_w);
344     return st;
345   }
346 
347   // Read from one array and write to the other
348   st = copy_array(query_r, query_w, &buffers, &buffer_sizes);
349   if (!st.ok()) {
350     tdb_delete(query_r);
351     tdb_delete(query_w);
352     return st;
353   }
354 
355   // Finalize write query
356   st = query_w->finalize();
357   if (!st.ok()) {
358     tdb_delete(query_r);
359     tdb_delete(query_w);
360     bool is_dir = false;
361     auto st2 = storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir);
362     (void)st2;  // Perhaps report this once we support an error stack
363     if (is_dir)
364       storage_manager_->vfs()->remove_dir(*new_fragment_uri);
365     return st;
366   }
367 
368   // Write vacuum file
369   st = write_vacuum_file(*new_fragment_uri, to_consolidate);
370   if (!st.ok()) {
371     tdb_delete(query_r);
372     tdb_delete(query_w);
373     bool is_dir = false;
374     storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir);
375     if (is_dir)
376       storage_manager_->vfs()->remove_dir(*new_fragment_uri);
377     return st;
378   }
379 
380   // Clean up
381   tdb_delete(query_r);
382   tdb_delete(query_w);
383 
384   return st;
385 }
386 
consolidate_fragment_meta(const URI & array_uri,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length)387 Status Consolidator::consolidate_fragment_meta(
388     const URI& array_uri,
389     EncryptionType encryption_type,
390     const void* encryption_key,
391     uint32_t key_length) {
392   auto timer_se = stats_->start_timer("consolidate_frag_meta");
393 
394   // Open array for reading
395   Array array(array_uri, storage_manager_);
396   RETURN_NOT_OK(
397       array.open(QueryType::READ, encryption_type, encryption_key, key_length));
398 
399   // Include only fragments with footers / separate basic metadata
400   Buffer buff;
401   const auto& tmp_meta = array.fragment_metadata();
402   std::vector<tdb_shared_ptr<FragmentMetadata>> meta;
403   for (auto m : tmp_meta) {
404     if (m->format_version() > 2)
405       meta.emplace_back(m);
406   }
407   auto fragment_num = (unsigned)meta.size();
408 
409   // Do not consolidate if the number of fragments is not >1
410   if (fragment_num < 2)
411     return array.close();
412 
413   // Write number of fragments
414   RETURN_NOT_OK(buff.write(&fragment_num, sizeof(uint32_t)));
415 
416   // Compute new URI
417   URI uri;
418   auto first = meta.front()->fragment_uri();
419   auto last = meta.back()->fragment_uri();
420   RETURN_NOT_OK(compute_new_fragment_uri(
421       first, last, array.array_schema_latest()->write_version(), &uri));
422   uri = URI(uri.to_string() + constants::meta_file_suffix);
423 
424   // Get the consolidated fragment metadata version
425   auto meta_name = uri.remove_trailing_slash().last_path_part();
426   auto pos = meta_name.find_last_of('.');
427   meta_name = (pos == std::string::npos) ? meta_name : meta_name.substr(0, pos);
428   uint32_t meta_version = 0;
429   RETURN_NOT_OK(utils::parse::get_fragment_version(meta_name, &meta_version));
430 
431   // Calculate offset of first fragment footer
432   uint64_t offset = sizeof(uint32_t);  // Fragment num
433   for (auto m : meta) {
434     offset += sizeof(uint64_t);  // Name size
435     if (meta_version >= 9) {
436       offset += m->fragment_uri().last_path_part().size();  // Name
437     } else {
438       offset += m->fragment_uri().to_string().size();  // Name
439     }
440     offset += sizeof(uint64_t);  // Offset
441   }
442 
443   // Serialize all fragment names and footer offsets into a single buffer
444   for (auto m : meta) {
445     // Write name size and name
446     std::string name;
447     if (meta_version >= 9) {
448       name = m->fragment_uri().last_path_part();
449     } else {
450       name = m->fragment_uri().to_string();
451     }
452     auto name_size = (uint64_t)name.size();
453     RETURN_NOT_OK(buff.write(&name_size, sizeof(uint64_t)));
454     RETURN_NOT_OK(buff.write(name.c_str(), name_size));
455     RETURN_NOT_OK(buff.write(&offset, sizeof(uint64_t)));
456 
457     offset += m->footer_size();
458   }
459 
460   // Serialize all fragment metadata footers in parallel
461   std::vector<Buffer> buffs(meta.size());
462   auto status = parallel_for(
463       storage_manager_->compute_tp(), 0, buffs.size(), [&](size_t i) {
464         RETURN_NOT_OK(meta[i]->write_footer(&buffs[i]));
465         return Status::Ok();
466       });
467   RETURN_NOT_OK(status);
468 
469   // Combine serialized fragment metadata footers into a single buffer
470   for (const auto& b : buffs)
471     RETURN_NOT_OK(buff.write(b.data(), b.size()));
472 
473   // Close array
474   RETURN_NOT_OK(array.close());
475 
476   // Write to file
477   EncryptionKey enc_key;
478   RETURN_NOT_OK(enc_key.set_key(encryption_type, encryption_key, key_length));
479   buff.reset_offset();
480   Tile tile(
481       constants::generic_tile_datatype,
482       constants::generic_tile_cell_size,
483       0,
484       &buff,
485       false);
486   buff.disown_data();
487   GenericTileIO tile_io(storage_manager_, uri);
488   uint64_t nbytes = 0;
489   RETURN_NOT_OK_ELSE(
490       tile_io.write_generic(&tile, enc_key, &nbytes), buff.clear());
491   (void)nbytes;
492   RETURN_NOT_OK_ELSE(storage_manager_->close_file(uri), buff.clear());
493 
494   buff.clear();
495 
496   return Status::Ok();
497 }
498 
copy_array(Query * query_r,Query * query_w,std::vector<ByteVec> * buffers,std::vector<uint64_t> * buffer_sizes)499 Status Consolidator::copy_array(
500     Query* query_r,
501     Query* query_w,
502     std::vector<ByteVec>* buffers,
503     std::vector<uint64_t>* buffer_sizes) {
504   auto timer_se = stats_->start_timer("consolidate_copy_array");
505 
506   // Set the read query buffers outside the repeated submissions.
507   // The Reader will reset the query buffer sizes to the original
508   // sizes, not the potentially smaller sizes of the results after
509   // the query submission.
510   RETURN_NOT_OK(set_query_buffers(query_r, buffers, buffer_sizes));
511 
512   do {
513     // READ
514     RETURN_NOT_OK(query_r->submit());
515 
516     // Set explicitly the write query buffers, as the sizes may have
517     // been altered by the read query.
518     RETURN_NOT_OK(set_query_buffers(query_w, buffers, buffer_sizes));
519 
520     // WRITE
521     RETURN_NOT_OK(query_w->submit());
522   } while (query_r->status() == QueryStatus::INCOMPLETE);
523 
524   return Status::Ok();
525 }
526 
create_buffers(const ArraySchema * array_schema,std::vector<ByteVec> * buffers,std::vector<uint64_t> * buffer_sizes)527 Status Consolidator::create_buffers(
528     const ArraySchema* array_schema,
529     std::vector<ByteVec>* buffers,
530     std::vector<uint64_t>* buffer_sizes) {
531   auto timer_se = stats_->start_timer("consolidate_create_buffers");
532 
533   // For easy reference
534   auto attribute_num = array_schema->attribute_num();
535   auto domain = array_schema->domain();
536   auto dim_num = array_schema->dim_num();
537   auto sparse = !array_schema->dense();
538 
539   // Calculate number of buffers
540   size_t buffer_num = 0;
541   for (unsigned i = 0; i < attribute_num; ++i) {
542     buffer_num += (array_schema->attributes()[i]->var_size()) ? 2 : 1;
543     buffer_num += (array_schema->attributes()[i]->nullable()) ? 1 : 0;
544   }
545   if (sparse) {
546     for (unsigned i = 0; i < dim_num; ++i)
547       buffer_num += (domain->dimension(i)->var_size()) ? 2 : 1;
548   }
549 
550   // Create buffers
551   buffers->resize(buffer_num);
552   buffer_sizes->resize(buffer_num);
553 
554   // Allocate space for each buffer
555   for (unsigned i = 0; i < buffer_num; ++i) {
556     (*buffers)[i].resize(config_.buffer_size_);
557     (*buffer_sizes)[i] = config_.buffer_size_;
558   }
559 
560   // Success
561   return Status::Ok();
562 }
563 
create_queries(Array * array_for_reads,Array * array_for_writes,const NDRange & subarray,Query ** query_r,Query ** query_w,URI * new_fragment_uri)564 Status Consolidator::create_queries(
565     Array* array_for_reads,
566     Array* array_for_writes,
567     const NDRange& subarray,
568     Query** query_r,
569     Query** query_w,
570     URI* new_fragment_uri) {
571   auto timer_se = stats_->start_timer("consolidate_create_queries");
572 
573   // Note: it is safe to use `set_subarray_safe` for `subarray` below
574   // because the subarray is calculated by the TileDB algorithm (it
575   // is not a user input prone to errors).
576 
577   // Create read query
578   *query_r = tdb_new(Query, storage_manager_, array_for_reads);
579   RETURN_NOT_OK((*query_r)->set_layout(Layout::GLOBAL_ORDER));
580 
581   // Refactored reader optimizes for no subarray.
582   if (!config_.use_refactored_reader_ ||
583       array_for_reads->array_schema_latest()->dense())
584     RETURN_NOT_OK((*query_r)->set_subarray_unsafe(subarray));
585 
586   // Get last fragment URI, which will be the URI of the consolidated fragment
587   auto first = (*query_r)->first_fragment_uri();
588   auto last = (*query_r)->last_fragment_uri();
589 
590   RETURN_NOT_OK(compute_new_fragment_uri(
591       first,
592       last,
593       array_for_reads->array_schema_latest()->write_version(),
594       new_fragment_uri));
595 
596   // Create write query
597   *query_w =
598       tdb_new(Query, storage_manager_, array_for_writes, *new_fragment_uri);
599   RETURN_NOT_OK((*query_w)->set_layout(Layout::GLOBAL_ORDER));
600   RETURN_NOT_OK((*query_w)->disable_check_global_order());
601   if (array_for_reads->array_schema_latest()->dense())
602     RETURN_NOT_OK((*query_w)->set_subarray_unsafe(subarray));
603 
604   return Status::Ok();
605 }
606 
compute_next_to_consolidate(const ArraySchema * array_schema,const FragmentInfo & fragment_info,std::vector<TimestampedURI> * to_consolidate,NDRange * union_non_empty_domains) const607 Status Consolidator::compute_next_to_consolidate(
608     const ArraySchema* array_schema,
609     const FragmentInfo& fragment_info,
610     std::vector<TimestampedURI>* to_consolidate,
611     NDRange* union_non_empty_domains) const {
612   auto timer_se = stats_->start_timer("consolidate_compute_next");
613 
614   // Preparation
615   auto sparse = !array_schema->dense();
616   const auto& fragments = fragment_info.fragments();
617   auto domain = array_schema->domain();
618   to_consolidate->clear();
619   auto min = config_.min_frags_;
620   min = (uint32_t)((min > fragments.size()) ? fragments.size() : min);
621   auto max = config_.max_frags_;
622   max = (uint32_t)((max > fragments.size()) ? fragments.size() : max);
623   auto size_ratio = config_.size_ratio_;
624 
625   // Trivial case - no fragments
626   if (max == 0)
627     return Status::Ok();
628 
629   // Prepare the dynamic-programming matrices. The rows are from 1 to max
630   // and the columns represent the fragments in `fragments`. One matrix
631   // stores the sum of fragment sizes, and the other the union of the
632   // corresponding non-empty domains of the fragments.
633   std::vector<std::vector<uint64_t>> m_sizes;
634   std::vector<std::vector<NDRange>> m_union;
635   auto col_num = fragments.size();
636   auto row_num = max;
637   m_sizes.resize(row_num);
638   for (auto& row : m_sizes)
639     row.resize(col_num);
640   m_union.resize(row_num);
641   for (auto& row : m_union) {
642     row.resize(col_num);
643   }
644 
645   // Entry m[i][j] contains the collective size of fragments
646   // fragments[j], ..., fragments[j+i]. If the size ratio
647   // of any adjacent pair in the above list is smaller than the
648   // defined one, or the entries' corresponding fragments are no
649   // consolidatable, then the size sum of that entry is infinity
650   // (UINT64_MAX) and the memory from the union matrix is freed.
651   // This marks this entry as invalid and it will never be selected
652   // as the winner for choosing which fragments to consolidate next.
653   for (size_t i = 0; i < row_num; ++i) {
654     for (size_t j = 0; j < col_num; ++j) {
655       if (i == 0) {  // In the first row we store the sizes of `fragments`
656         m_sizes[i][j] = fragments[j].fragment_size();
657         m_union[i][j] = fragments[j].non_empty_domain();
658       } else if (i + j >= col_num) {  // Non-valid entries
659         m_sizes[i][j] = UINT64_MAX;
660         m_union[i][j].clear();
661         m_union[i][j].shrink_to_fit();
662       } else {  // Every other row is computed using the previous row
663         auto ratio = (float)fragments[i + j - 1].fragment_size() /
664                      fragments[i + j].fragment_size();
665         ratio = (ratio <= 1.0f) ? ratio : 1.0f / ratio;
666         if (ratio >= size_ratio && (m_sizes[i - 1][j] != UINT64_MAX)) {
667           m_sizes[i][j] = m_sizes[i - 1][j] + fragments[i + j].fragment_size();
668           m_union[i][j] = m_union[i - 1][j];
669           domain->expand_ndrange(
670               fragments[i + j].non_empty_domain(), &m_union[i][j]);
671           domain->expand_to_tiles(&m_union[i][j]);
672           if (!sparse && !are_consolidatable(
673                              domain, fragment_info, j, j + i, m_union[i][j])) {
674             // Mark this entry as invalid
675             m_sizes[i][j] = UINT64_MAX;
676             m_union[i][j].clear();
677             m_union[i][j].shrink_to_fit();
678           }
679         } else {
680           // Mark this entry as invalid
681           m_sizes[i][j] = UINT64_MAX;
682           m_union[i][j].clear();
683           m_union[i][j].shrink_to_fit();
684         }
685       }
686     }
687   }
688 
689   // Choose the maximal set of fragments with cardinality in [min, max]
690   // with the minimum size
691   uint64_t min_size = UINT64_MAX;
692   size_t min_col = 0;
693   for (int i = row_num - 1; (i >= 0 && i >= (int)min - 1); --i) {
694     min_size = UINT64_MAX;
695     for (size_t j = 0; j < col_num; ++j) {
696       // Update the min size if the new size is more than 25% smaller.
697       // This is to give preference to earlier fragment sets, in case
698       // the user writes in *approximately* equal batches. Otherwise,
699       // fragment sets in the middle of the timeline may get consolidated,
700       // which will hinder the next step of consolidation (which will
701       // select some small and some big fragments).
702       if (min_size == UINT64_MAX || m_sizes[i][j] < (min_size / 1.25)) {
703         min_size = m_sizes[i][j];
704         min_col = j;
705       }
706     }
707 
708     // Results not found
709     if (min_size == UINT64_MAX)
710       continue;
711 
712     // Results found
713     for (size_t f = min_col; f <= min_col + i; ++f) {
714       to_consolidate->emplace_back(
715           fragments[f].uri(), fragments[f].timestamp_range());
716     }
717     *union_non_empty_domains = m_union[i][min_col];
718     break;
719   }
720 
721   return Status::Ok();
722 }
723 
compute_new_fragment_uri(const URI & first,const URI & last,uint32_t format_version,URI * new_uri) const724 Status Consolidator::compute_new_fragment_uri(
725     const URI& first,
726     const URI& last,
727     uint32_t format_version,
728     URI* new_uri) const {
729   // Get uuid
730   std::string uuid;
731   RETURN_NOT_OK(uuid::generate_uuid(&uuid, false));
732 
733   // For creating the new fragment URI
734 
735   // Get timestamp ranges
736   std::pair<uint64_t, uint64_t> t_first, t_last;
737   RETURN_NOT_OK(utils::parse::get_timestamp_range(first, &t_first));
738   RETURN_NOT_OK(utils::parse::get_timestamp_range(last, &t_last));
739 
740   // Create new URI
741   std::stringstream ss;
742   ss << first.parent().to_string() << "/__" << t_first.first << "_"
743      << t_last.second << "_" << uuid << "_" << format_version;
744 
745   *new_uri = URI(ss.str());
746 
747   return Status::Ok();
748 }
749 
set_query_buffers(Query * query,std::vector<ByteVec> * buffers,std::vector<uint64_t> * buffer_sizes) const750 Status Consolidator::set_query_buffers(
751     Query* query,
752     std::vector<ByteVec>* buffers,
753     std::vector<uint64_t>* buffer_sizes) const {
754   auto array_schema = query->array_schema();
755   auto dim_num = array_schema->dim_num();
756   auto dense = array_schema->dense();
757   auto attributes = array_schema->attributes();
758   unsigned bid = 0;
759   for (const auto& attr : attributes) {
760     if (!attr->var_size()) {
761       if (!attr->nullable()) {
762         RETURN_NOT_OK(query->set_data_buffer(
763             attr->name(), (void*)&(*buffers)[bid][0], &(*buffer_sizes)[bid]));
764         ++bid;
765       } else {
766         RETURN_NOT_OK(query->set_buffer_vbytemap(
767             attr->name(),
768             (void*)&(*buffers)[bid][0],
769             &(*buffer_sizes)[bid],
770             (uint8_t*)&(*buffers)[bid + 1][0],
771             &(*buffer_sizes)[bid + 1]));
772         bid += 2;
773       }
774     } else {
775       if (!attr->nullable()) {
776         RETURN_NOT_OK(query->set_data_buffer(
777             attr->name(),
778             (void*)&(*buffers)[bid + 1][0],
779             &(*buffer_sizes)[bid + 1]));
780         RETURN_NOT_OK(query->set_offsets_buffer(
781             attr->name(),
782             (uint64_t*)&(*buffers)[bid][0],
783             &(*buffer_sizes)[bid]));
784         bid += 2;
785       } else {
786         RETURN_NOT_OK(query->set_buffer_vbytemap(
787             attr->name(),
788             (uint64_t*)&(*buffers)[bid][0],
789             &(*buffer_sizes)[bid],
790             (void*)&(*buffers)[bid + 1][0],
791             &(*buffer_sizes)[bid + 1],
792             (uint8_t*)&(*buffers)[bid + 2][0],
793             &(*buffer_sizes)[bid + 2]));
794         bid += 3;
795       }
796     }
797   }
798   if (!dense) {
799     for (unsigned d = 0; d < dim_num; ++d) {
800       auto dim = array_schema->dimension(d);
801       auto dim_name = dim->name();
802       if (!dim->var_size()) {
803         RETURN_NOT_OK(query->set_data_buffer(
804             dim_name, (void*)&(*buffers)[bid][0], &(*buffer_sizes)[bid]));
805         ++bid;
806       } else {
807         RETURN_NOT_OK(query->set_data_buffer(
808             dim_name,
809             (void*)&(*buffers)[bid + 1][0],
810             &(*buffer_sizes)[bid + 1]));
811         RETURN_NOT_OK(query->set_offsets_buffer(
812             dim_name, (uint64_t*)&(*buffers)[bid][0], &(*buffer_sizes)[bid]));
813         bid += 2;
814       }
815     }
816   }
817 
818   return Status::Ok();
819 }
820 
update_fragment_info(const std::vector<TimestampedURI> & to_consolidate,const SingleFragmentInfo & new_fragment_info,FragmentInfo * fragment_info) const821 void Consolidator::update_fragment_info(
822     const std::vector<TimestampedURI>& to_consolidate,
823     const SingleFragmentInfo& new_fragment_info,
824     FragmentInfo* fragment_info) const {
825   auto to_consolidate_it = to_consolidate.begin();
826   auto fragment_it = fragment_info->fragments().begin();
827   FragmentInfo updated_fragment_info;
828   bool new_fragment_added = false;
829 
830   while (fragment_it != fragment_info->fragments().end()) {
831     // No match - add the fragment info and advance `fragment_it`
832     if (to_consolidate_it == to_consolidate.end() ||
833         fragment_it->uri().to_string() != to_consolidate_it->uri_.to_string()) {
834       updated_fragment_info.append(*fragment_it);
835       ++fragment_it;
836     } else {  // Match - add new fragment only once and advance both iterators
837       if (!new_fragment_added) {
838         updated_fragment_info.append(new_fragment_info);
839         new_fragment_added = true;
840       }
841       ++fragment_it;
842       ++to_consolidate_it;
843     }
844   }
845 
846   assert(
847       updated_fragment_info.fragment_num() ==
848       fragment_info->fragment_num() - to_consolidate.size() + 1);
849 
850   *fragment_info = std::move(updated_fragment_info);
851 }
852 
set_config(const Config * config)853 Status Consolidator::set_config(const Config* config) {
854   // Set the config
855   Config merged_config = storage_manager_->config();
856   if (config)
857     merged_config.inherit(*config);
858   bool found = false;
859   config_.amplification_ = 0.0f;
860   RETURN_NOT_OK(merged_config.get<float>(
861       "sm.consolidation.amplification", &config_.amplification_, &found));
862   assert(found);
863   config_.steps_ = 0;
864   RETURN_NOT_OK(merged_config.get<uint32_t>(
865       "sm.consolidation.steps", &config_.steps_, &found));
866   assert(found);
867   config_.buffer_size_ = 0;
868   RETURN_NOT_OK(merged_config.get<uint64_t>(
869       "sm.consolidation.buffer_size", &config_.buffer_size_, &found));
870   assert(found);
871   config_.size_ratio_ = 0.0f;
872   RETURN_NOT_OK(merged_config.get<float>(
873       "sm.consolidation.step_size_ratio", &config_.size_ratio_, &found));
874   assert(found);
875   config_.min_frags_ = 0;
876   RETURN_NOT_OK(merged_config.get<uint32_t>(
877       "sm.consolidation.step_min_frags", &config_.min_frags_, &found));
878   assert(found);
879   config_.max_frags_ = 0;
880   RETURN_NOT_OK(merged_config.get<uint32_t>(
881       "sm.consolidation.step_max_frags", &config_.max_frags_, &found));
882   assert(found);
883   const std::string mode = merged_config.get("sm.consolidation.mode", &found);
884   if (!found)
885     return logger_->status(Status::ConsolidatorError(
886         "Cannot consolidate; Consolidation mode cannot be null"));
887   config_.mode_ = mode;
888   RETURN_NOT_OK(merged_config.get<uint64_t>(
889       "sm.consolidation.timestamp_start", &config_.timestamp_start_, &found));
890   assert(found);
891   RETURN_NOT_OK(merged_config.get<uint64_t>(
892       "sm.consolidation.timestamp_end", &config_.timestamp_end_, &found));
893   assert(found);
894   std::string reader =
895       merged_config.get("sm.query.sparse_global_order.reader", &found);
896   assert(found);
897   config_.use_refactored_reader_ = reader.compare("reafctored") == 0;
898 
899   // Sanity checks
900   if (config_.min_frags_ > config_.max_frags_)
901     return logger_->status(Status::ConsolidatorError(
902         "Invalid configuration; Minimum fragments config parameter is larger "
903         "than the maximum"));
904   if (config_.size_ratio_ > 1.0f || config_.size_ratio_ < 0.0f)
905     return logger_->status(Status::ConsolidatorError(
906         "Invalid configuration; Step size ratio config parameter must be in "
907         "[0.0, 1.0]"));
908   if (config_.amplification_ < 0)
909     return logger_->status(
910         Status::ConsolidatorError("Invalid configuration; Amplification config "
911                                   "parameter must be non-negative"));
912 
913   return Status::Ok();
914 }
915 
write_vacuum_file(const URI & new_uri,const std::vector<TimestampedURI> & to_consolidate) const916 Status Consolidator::write_vacuum_file(
917     const URI& new_uri,
918     const std::vector<TimestampedURI>& to_consolidate) const {
919   URI vac_uri = URI(new_uri.to_string() + constants::vacuum_file_suffix);
920 
921   std::stringstream ss;
922   for (const auto& timestampedURI : to_consolidate)
923     ss << timestampedURI.uri_.to_string() << "\n";
924 
925   auto data = ss.str();
926   RETURN_NOT_OK(
927       storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
928   RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
929 
930   return Status::Ok();
931 }
932 
933 }  // namespace sm
934 }  // namespace tiledb
935