1 /**
2 * @file consolidator.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file implements the Consolidator class.
31 */
32
33 #include "tiledb/sm/storage_manager/consolidator.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array_schema/array_schema.h"
36 #include "tiledb/sm/enums/datatype.h"
37 #include "tiledb/sm/enums/query_status.h"
38 #include "tiledb/sm/enums/query_type.h"
39 #include "tiledb/sm/filesystem/vfs.h"
40 #include "tiledb/sm/fragment/single_fragment_info.h"
41 #include "tiledb/sm/misc/parallel_functions.h"
42 #include "tiledb/sm/misc/utils.h"
43 #include "tiledb/sm/misc/uuid.h"
44 #include "tiledb/sm/query/query.h"
45 #include "tiledb/sm/stats/global_stats.h"
46 #include "tiledb/sm/storage_manager/storage_manager.h"
47 #include "tiledb/sm/tile/generic_tile_io.h"
48 #include "tiledb/sm/tile/tile.h"
49
50 #include <iostream>
51 #include <sstream>
52
53 using namespace tiledb::common;
54
55 namespace tiledb {
56 namespace sm {
57
58 /* ****************************** */
59 /* CONSTRUCTORS & DESTRUCTORS */
60 /* ****************************** */
61
Consolidator(StorageManager * storage_manager)62 Consolidator::Consolidator(StorageManager* storage_manager)
63 : storage_manager_(storage_manager)
64 , stats_(storage_manager_->stats()->create_child("Consolidator"))
65 , logger_(storage_manager_->logger()->clone("Consolidator", ++logger_id_)) {
66 }
67
68 Consolidator::~Consolidator() = default;
69
70 /* ****************************** */
71 /* API */
72 /* ****************************** */
73
consolidate(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length,const Config * config)74 Status Consolidator::consolidate(
75 const char* array_name,
76 EncryptionType encryption_type,
77 const void* encryption_key,
78 uint32_t key_length,
79 const Config* config) {
80 // Set config parameters
81 RETURN_NOT_OK(set_config(config));
82
83 // Consolidate based on mode
84 URI array_uri = URI(array_name);
85 if (config_.mode_ == "fragment_meta")
86 return consolidate_fragment_meta(
87 array_uri, encryption_type, encryption_key, key_length);
88 else if (config_.mode_ == "fragments")
89 return consolidate_fragments(
90 array_name, encryption_type, encryption_key, key_length);
91 else if (config_.mode_ == "array_meta")
92 return consolidate_array_meta(
93 array_name, encryption_type, encryption_key, key_length);
94
95 return logger_->status(Status::ConsolidatorError(
96 "Cannot consolidate; Invalid consolidation mode"));
97 }
98
consolidate_array_meta(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length)99 Status Consolidator::consolidate_array_meta(
100 const char* array_name,
101 EncryptionType encryption_type,
102 const void* encryption_key,
103 uint32_t key_length) {
104 auto timer_se = stats_->start_timer("consolidate_array_meta");
105
106 // Open array for reading
107 auto array_uri = URI(array_name);
108 Array array_for_reads(array_uri, storage_manager_);
109 RETURN_NOT_OK(array_for_reads.open(
110 QueryType::READ,
111 config_.timestamp_start_,
112 config_.timestamp_end_,
113 encryption_type,
114 encryption_key,
115 key_length));
116
117 // Open array for writing
118 Array array_for_writes(array_uri, storage_manager_);
119 RETURN_NOT_OK_ELSE(
120 array_for_writes.open(
121 QueryType::WRITE, encryption_type, encryption_key, key_length),
122 array_for_reads.close());
123
124 // Swap the in-memory metadata between the two arrays.
125 // After that, the array for writes will store the (consolidated by
126 // the way metadata loading works) metadata of the array for reads
127 Metadata* metadata_r;
128 auto st = array_for_reads.metadata(&metadata_r);
129 if (!st.ok()) {
130 array_for_reads.close();
131 array_for_writes.close();
132 return st;
133 }
134 Metadata* metadata_w;
135 st = array_for_writes.metadata(&metadata_w);
136 if (!st.ok()) {
137 array_for_reads.close();
138 array_for_writes.close();
139 return st;
140 }
141 metadata_r->swap(metadata_w);
142
143 // Metadata uris to delete
144 const auto to_vacuum = metadata_w->loaded_metadata_uris();
145
146 // Generate new name for consolidated metadata
147 st = metadata_w->generate_uri(array_uri);
148 if (!st.ok()) {
149 array_for_reads.close();
150 array_for_writes.close();
151 return st;
152 }
153
154 // Get the new URI name
155 URI new_uri;
156 st = metadata_w->get_uri(array_uri, &new_uri);
157 if (!st.ok()) {
158 array_for_reads.close();
159 array_for_writes.close();
160 return st;
161 }
162
163 // Close arrays
164 RETURN_NOT_OK_ELSE(array_for_reads.close(), array_for_writes.close());
165 RETURN_NOT_OK(array_for_writes.close());
166
167 // Write vacuum file
168 URI vac_uri = URI(new_uri.to_string() + constants::vacuum_file_suffix);
169
170 std::stringstream ss;
171 for (const auto& uri : to_vacuum)
172 ss << uri.to_string() << "\n";
173
174 auto data = ss.str();
175 RETURN_NOT_OK(
176 storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
177 RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
178
179 return Status::Ok();
180 }
181
182 /* ****************************** */
183 /* PRIVATE METHODS */
184 /* ****************************** */
185
consolidate_fragments(const char * array_name,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length)186 Status Consolidator::consolidate_fragments(
187 const char* array_name,
188 EncryptionType encryption_type,
189 const void* encryption_key,
190 uint32_t key_length) {
191 auto timer_se = stats_->start_timer("consolidate_frags");
192
193 // Open array for reading
194 Array array_for_reads(URI(array_name), storage_manager_);
195 RETURN_NOT_OK(array_for_reads.open_without_fragments(
196 encryption_type, encryption_key, key_length));
197
198 // Open array for writing
199 Array array_for_writes(array_for_reads.array_uri(), storage_manager_);
200 RETURN_NOT_OK(array_for_writes.open(
201 QueryType::WRITE, encryption_type, encryption_key, key_length));
202
203 // Get fragment info
204 FragmentInfo fragment_info;
205 auto st = storage_manager_->get_fragment_info(
206 array_for_reads,
207 config_.timestamp_start_,
208 config_.timestamp_end_,
209 &fragment_info);
210 if (!st.ok()) {
211 array_for_reads.close();
212 array_for_writes.close();
213 return st;
214 }
215
216 uint32_t step = 0;
217 std::vector<TimestampedURI> to_consolidate;
218 do {
219 // No need to consolidate if no more than 1 fragment exist
220 if (fragment_info.fragment_num() <= 1)
221 break;
222
223 // Find the next fragments to be consolidated
224 NDRange union_non_empty_domains;
225 st = compute_next_to_consolidate(
226 array_for_reads.array_schema_latest(),
227 fragment_info,
228 &to_consolidate,
229 &union_non_empty_domains);
230 if (!st.ok()) {
231 array_for_reads.close();
232 array_for_writes.close();
233 return st;
234 }
235
236 // Check if there is anything to consolidate
237 if (to_consolidate.size() <= 1)
238 break;
239
240 // Consolidate the selected fragments
241 URI new_fragment_uri;
242 st = consolidate(
243 array_for_reads,
244 array_for_writes,
245 to_consolidate,
246 union_non_empty_domains,
247 &new_fragment_uri);
248 if (!st.ok()) {
249 array_for_reads.close();
250 array_for_writes.close();
251 return st;
252 }
253
254 // Get fragment info of the consolidated fragment
255 SingleFragmentInfo new_fragment_info;
256 st = storage_manager_->get_fragment_info(
257 array_for_reads, new_fragment_uri, &new_fragment_info);
258 if (!st.ok()) {
259 array_for_reads.close();
260 array_for_writes.close();
261 return st;
262 }
263
264 // Update fragment info
265 update_fragment_info(to_consolidate, new_fragment_info, &fragment_info);
266
267 // Advance number of steps
268 ++step;
269
270 } while (step < config_.steps_);
271
272 RETURN_NOT_OK_ELSE(array_for_reads.close(), array_for_writes.close());
273 RETURN_NOT_OK(array_for_writes.close());
274
275 stats_->add_counter("consolidate_step_num", step);
276
277 return Status::Ok();
278 }
279
are_consolidatable(const Domain * domain,const FragmentInfo & fragment_info,size_t start,size_t end,const NDRange & union_non_empty_domains) const280 bool Consolidator::are_consolidatable(
281 const Domain* domain,
282 const FragmentInfo& fragment_info,
283 size_t start,
284 size_t end,
285 const NDRange& union_non_empty_domains) const {
286 auto anterior_ndrange = fragment_info.anterior_ndrange();
287 if (anterior_ndrange.size() != 0 &&
288 domain->overlap(union_non_empty_domains, anterior_ndrange))
289 return false;
290
291 // Check overlap of union with earlier fragments
292 const auto& fragments = fragment_info.fragments();
293 for (size_t i = 0; i < start; ++i) {
294 if (domain->overlap(
295 union_non_empty_domains, fragments[i].non_empty_domain()))
296 return false;
297 }
298
299 // Check consolidation amplification factor
300 auto union_cell_num = domain->cell_num(union_non_empty_domains);
301 uint64_t sum_cell_num = 0;
302 for (size_t i = start; i <= end; ++i) {
303 sum_cell_num += domain->cell_num(fragments[i].expanded_non_empty_domain());
304 }
305
306 return (double(union_cell_num) / sum_cell_num) <= config_.amplification_;
307 }
308
consolidate(Array & array_for_reads,Array & array_for_writes,const std::vector<TimestampedURI> & to_consolidate,const NDRange & union_non_empty_domains,URI * new_fragment_uri)309 Status Consolidator::consolidate(
310 Array& array_for_reads,
311 Array& array_for_writes,
312 const std::vector<TimestampedURI>& to_consolidate,
313 const NDRange& union_non_empty_domains,
314 URI* new_fragment_uri) {
315 auto timer_se = stats_->start_timer("consolidate_main");
316
317 RETURN_NOT_OK(array_for_reads.load_fragments(to_consolidate));
318
319 if (array_for_reads.is_empty()) {
320 return Status::Ok();
321 }
322
323 // Get schema
324 auto array_schema = array_for_reads.array_schema_latest();
325
326 // Prepare buffers
327 std::vector<ByteVec> buffers;
328 std::vector<uint64_t> buffer_sizes;
329 RETURN_NOT_OK(create_buffers(array_schema, &buffers, &buffer_sizes));
330
331 // Create queries
332 auto query_r = (Query*)nullptr;
333 auto query_w = (Query*)nullptr;
334 auto st = create_queries(
335 &array_for_reads,
336 &array_for_writes,
337 union_non_empty_domains,
338 &query_r,
339 &query_w,
340 new_fragment_uri);
341 if (!st.ok()) {
342 tdb_delete(query_r);
343 tdb_delete(query_w);
344 return st;
345 }
346
347 // Read from one array and write to the other
348 st = copy_array(query_r, query_w, &buffers, &buffer_sizes);
349 if (!st.ok()) {
350 tdb_delete(query_r);
351 tdb_delete(query_w);
352 return st;
353 }
354
355 // Finalize write query
356 st = query_w->finalize();
357 if (!st.ok()) {
358 tdb_delete(query_r);
359 tdb_delete(query_w);
360 bool is_dir = false;
361 auto st2 = storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir);
362 (void)st2; // Perhaps report this once we support an error stack
363 if (is_dir)
364 storage_manager_->vfs()->remove_dir(*new_fragment_uri);
365 return st;
366 }
367
368 // Write vacuum file
369 st = write_vacuum_file(*new_fragment_uri, to_consolidate);
370 if (!st.ok()) {
371 tdb_delete(query_r);
372 tdb_delete(query_w);
373 bool is_dir = false;
374 storage_manager_->vfs()->is_dir(*new_fragment_uri, &is_dir);
375 if (is_dir)
376 storage_manager_->vfs()->remove_dir(*new_fragment_uri);
377 return st;
378 }
379
380 // Clean up
381 tdb_delete(query_r);
382 tdb_delete(query_w);
383
384 return st;
385 }
386
consolidate_fragment_meta(const URI & array_uri,EncryptionType encryption_type,const void * encryption_key,uint32_t key_length)387 Status Consolidator::consolidate_fragment_meta(
388 const URI& array_uri,
389 EncryptionType encryption_type,
390 const void* encryption_key,
391 uint32_t key_length) {
392 auto timer_se = stats_->start_timer("consolidate_frag_meta");
393
394 // Open array for reading
395 Array array(array_uri, storage_manager_);
396 RETURN_NOT_OK(
397 array.open(QueryType::READ, encryption_type, encryption_key, key_length));
398
399 // Include only fragments with footers / separate basic metadata
400 Buffer buff;
401 const auto& tmp_meta = array.fragment_metadata();
402 std::vector<tdb_shared_ptr<FragmentMetadata>> meta;
403 for (auto m : tmp_meta) {
404 if (m->format_version() > 2)
405 meta.emplace_back(m);
406 }
407 auto fragment_num = (unsigned)meta.size();
408
409 // Do not consolidate if the number of fragments is not >1
410 if (fragment_num < 2)
411 return array.close();
412
413 // Write number of fragments
414 RETURN_NOT_OK(buff.write(&fragment_num, sizeof(uint32_t)));
415
416 // Compute new URI
417 URI uri;
418 auto first = meta.front()->fragment_uri();
419 auto last = meta.back()->fragment_uri();
420 RETURN_NOT_OK(compute_new_fragment_uri(
421 first, last, array.array_schema_latest()->write_version(), &uri));
422 uri = URI(uri.to_string() + constants::meta_file_suffix);
423
424 // Get the consolidated fragment metadata version
425 auto meta_name = uri.remove_trailing_slash().last_path_part();
426 auto pos = meta_name.find_last_of('.');
427 meta_name = (pos == std::string::npos) ? meta_name : meta_name.substr(0, pos);
428 uint32_t meta_version = 0;
429 RETURN_NOT_OK(utils::parse::get_fragment_version(meta_name, &meta_version));
430
431 // Calculate offset of first fragment footer
432 uint64_t offset = sizeof(uint32_t); // Fragment num
433 for (auto m : meta) {
434 offset += sizeof(uint64_t); // Name size
435 if (meta_version >= 9) {
436 offset += m->fragment_uri().last_path_part().size(); // Name
437 } else {
438 offset += m->fragment_uri().to_string().size(); // Name
439 }
440 offset += sizeof(uint64_t); // Offset
441 }
442
443 // Serialize all fragment names and footer offsets into a single buffer
444 for (auto m : meta) {
445 // Write name size and name
446 std::string name;
447 if (meta_version >= 9) {
448 name = m->fragment_uri().last_path_part();
449 } else {
450 name = m->fragment_uri().to_string();
451 }
452 auto name_size = (uint64_t)name.size();
453 RETURN_NOT_OK(buff.write(&name_size, sizeof(uint64_t)));
454 RETURN_NOT_OK(buff.write(name.c_str(), name_size));
455 RETURN_NOT_OK(buff.write(&offset, sizeof(uint64_t)));
456
457 offset += m->footer_size();
458 }
459
460 // Serialize all fragment metadata footers in parallel
461 std::vector<Buffer> buffs(meta.size());
462 auto status = parallel_for(
463 storage_manager_->compute_tp(), 0, buffs.size(), [&](size_t i) {
464 RETURN_NOT_OK(meta[i]->write_footer(&buffs[i]));
465 return Status::Ok();
466 });
467 RETURN_NOT_OK(status);
468
469 // Combine serialized fragment metadata footers into a single buffer
470 for (const auto& b : buffs)
471 RETURN_NOT_OK(buff.write(b.data(), b.size()));
472
473 // Close array
474 RETURN_NOT_OK(array.close());
475
476 // Write to file
477 EncryptionKey enc_key;
478 RETURN_NOT_OK(enc_key.set_key(encryption_type, encryption_key, key_length));
479 buff.reset_offset();
480 Tile tile(
481 constants::generic_tile_datatype,
482 constants::generic_tile_cell_size,
483 0,
484 &buff,
485 false);
486 buff.disown_data();
487 GenericTileIO tile_io(storage_manager_, uri);
488 uint64_t nbytes = 0;
489 RETURN_NOT_OK_ELSE(
490 tile_io.write_generic(&tile, enc_key, &nbytes), buff.clear());
491 (void)nbytes;
492 RETURN_NOT_OK_ELSE(storage_manager_->close_file(uri), buff.clear());
493
494 buff.clear();
495
496 return Status::Ok();
497 }
498
copy_array(Query * query_r,Query * query_w,std::vector<ByteVec> * buffers,std::vector<uint64_t> * buffer_sizes)499 Status Consolidator::copy_array(
500 Query* query_r,
501 Query* query_w,
502 std::vector<ByteVec>* buffers,
503 std::vector<uint64_t>* buffer_sizes) {
504 auto timer_se = stats_->start_timer("consolidate_copy_array");
505
506 // Set the read query buffers outside the repeated submissions.
507 // The Reader will reset the query buffer sizes to the original
508 // sizes, not the potentially smaller sizes of the results after
509 // the query submission.
510 RETURN_NOT_OK(set_query_buffers(query_r, buffers, buffer_sizes));
511
512 do {
513 // READ
514 RETURN_NOT_OK(query_r->submit());
515
516 // Set explicitly the write query buffers, as the sizes may have
517 // been altered by the read query.
518 RETURN_NOT_OK(set_query_buffers(query_w, buffers, buffer_sizes));
519
520 // WRITE
521 RETURN_NOT_OK(query_w->submit());
522 } while (query_r->status() == QueryStatus::INCOMPLETE);
523
524 return Status::Ok();
525 }
526
create_buffers(const ArraySchema * array_schema,std::vector<ByteVec> * buffers,std::vector<uint64_t> * buffer_sizes)527 Status Consolidator::create_buffers(
528 const ArraySchema* array_schema,
529 std::vector<ByteVec>* buffers,
530 std::vector<uint64_t>* buffer_sizes) {
531 auto timer_se = stats_->start_timer("consolidate_create_buffers");
532
533 // For easy reference
534 auto attribute_num = array_schema->attribute_num();
535 auto domain = array_schema->domain();
536 auto dim_num = array_schema->dim_num();
537 auto sparse = !array_schema->dense();
538
539 // Calculate number of buffers
540 size_t buffer_num = 0;
541 for (unsigned i = 0; i < attribute_num; ++i) {
542 buffer_num += (array_schema->attributes()[i]->var_size()) ? 2 : 1;
543 buffer_num += (array_schema->attributes()[i]->nullable()) ? 1 : 0;
544 }
545 if (sparse) {
546 for (unsigned i = 0; i < dim_num; ++i)
547 buffer_num += (domain->dimension(i)->var_size()) ? 2 : 1;
548 }
549
550 // Create buffers
551 buffers->resize(buffer_num);
552 buffer_sizes->resize(buffer_num);
553
554 // Allocate space for each buffer
555 for (unsigned i = 0; i < buffer_num; ++i) {
556 (*buffers)[i].resize(config_.buffer_size_);
557 (*buffer_sizes)[i] = config_.buffer_size_;
558 }
559
560 // Success
561 return Status::Ok();
562 }
563
create_queries(Array * array_for_reads,Array * array_for_writes,const NDRange & subarray,Query ** query_r,Query ** query_w,URI * new_fragment_uri)564 Status Consolidator::create_queries(
565 Array* array_for_reads,
566 Array* array_for_writes,
567 const NDRange& subarray,
568 Query** query_r,
569 Query** query_w,
570 URI* new_fragment_uri) {
571 auto timer_se = stats_->start_timer("consolidate_create_queries");
572
573 // Note: it is safe to use `set_subarray_safe` for `subarray` below
574 // because the subarray is calculated by the TileDB algorithm (it
575 // is not a user input prone to errors).
576
577 // Create read query
578 *query_r = tdb_new(Query, storage_manager_, array_for_reads);
579 RETURN_NOT_OK((*query_r)->set_layout(Layout::GLOBAL_ORDER));
580
581 // Refactored reader optimizes for no subarray.
582 if (!config_.use_refactored_reader_ ||
583 array_for_reads->array_schema_latest()->dense())
584 RETURN_NOT_OK((*query_r)->set_subarray_unsafe(subarray));
585
586 // Get last fragment URI, which will be the URI of the consolidated fragment
587 auto first = (*query_r)->first_fragment_uri();
588 auto last = (*query_r)->last_fragment_uri();
589
590 RETURN_NOT_OK(compute_new_fragment_uri(
591 first,
592 last,
593 array_for_reads->array_schema_latest()->write_version(),
594 new_fragment_uri));
595
596 // Create write query
597 *query_w =
598 tdb_new(Query, storage_manager_, array_for_writes, *new_fragment_uri);
599 RETURN_NOT_OK((*query_w)->set_layout(Layout::GLOBAL_ORDER));
600 RETURN_NOT_OK((*query_w)->disable_check_global_order());
601 if (array_for_reads->array_schema_latest()->dense())
602 RETURN_NOT_OK((*query_w)->set_subarray_unsafe(subarray));
603
604 return Status::Ok();
605 }
606
compute_next_to_consolidate(const ArraySchema * array_schema,const FragmentInfo & fragment_info,std::vector<TimestampedURI> * to_consolidate,NDRange * union_non_empty_domains) const607 Status Consolidator::compute_next_to_consolidate(
608 const ArraySchema* array_schema,
609 const FragmentInfo& fragment_info,
610 std::vector<TimestampedURI>* to_consolidate,
611 NDRange* union_non_empty_domains) const {
612 auto timer_se = stats_->start_timer("consolidate_compute_next");
613
614 // Preparation
615 auto sparse = !array_schema->dense();
616 const auto& fragments = fragment_info.fragments();
617 auto domain = array_schema->domain();
618 to_consolidate->clear();
619 auto min = config_.min_frags_;
620 min = (uint32_t)((min > fragments.size()) ? fragments.size() : min);
621 auto max = config_.max_frags_;
622 max = (uint32_t)((max > fragments.size()) ? fragments.size() : max);
623 auto size_ratio = config_.size_ratio_;
624
625 // Trivial case - no fragments
626 if (max == 0)
627 return Status::Ok();
628
629 // Prepare the dynamic-programming matrices. The rows are from 1 to max
630 // and the columns represent the fragments in `fragments`. One matrix
631 // stores the sum of fragment sizes, and the other the union of the
632 // corresponding non-empty domains of the fragments.
633 std::vector<std::vector<uint64_t>> m_sizes;
634 std::vector<std::vector<NDRange>> m_union;
635 auto col_num = fragments.size();
636 auto row_num = max;
637 m_sizes.resize(row_num);
638 for (auto& row : m_sizes)
639 row.resize(col_num);
640 m_union.resize(row_num);
641 for (auto& row : m_union) {
642 row.resize(col_num);
643 }
644
645 // Entry m[i][j] contains the collective size of fragments
646 // fragments[j], ..., fragments[j+i]. If the size ratio
647 // of any adjacent pair in the above list is smaller than the
648 // defined one, or the entries' corresponding fragments are no
649 // consolidatable, then the size sum of that entry is infinity
650 // (UINT64_MAX) and the memory from the union matrix is freed.
651 // This marks this entry as invalid and it will never be selected
652 // as the winner for choosing which fragments to consolidate next.
653 for (size_t i = 0; i < row_num; ++i) {
654 for (size_t j = 0; j < col_num; ++j) {
655 if (i == 0) { // In the first row we store the sizes of `fragments`
656 m_sizes[i][j] = fragments[j].fragment_size();
657 m_union[i][j] = fragments[j].non_empty_domain();
658 } else if (i + j >= col_num) { // Non-valid entries
659 m_sizes[i][j] = UINT64_MAX;
660 m_union[i][j].clear();
661 m_union[i][j].shrink_to_fit();
662 } else { // Every other row is computed using the previous row
663 auto ratio = (float)fragments[i + j - 1].fragment_size() /
664 fragments[i + j].fragment_size();
665 ratio = (ratio <= 1.0f) ? ratio : 1.0f / ratio;
666 if (ratio >= size_ratio && (m_sizes[i - 1][j] != UINT64_MAX)) {
667 m_sizes[i][j] = m_sizes[i - 1][j] + fragments[i + j].fragment_size();
668 m_union[i][j] = m_union[i - 1][j];
669 domain->expand_ndrange(
670 fragments[i + j].non_empty_domain(), &m_union[i][j]);
671 domain->expand_to_tiles(&m_union[i][j]);
672 if (!sparse && !are_consolidatable(
673 domain, fragment_info, j, j + i, m_union[i][j])) {
674 // Mark this entry as invalid
675 m_sizes[i][j] = UINT64_MAX;
676 m_union[i][j].clear();
677 m_union[i][j].shrink_to_fit();
678 }
679 } else {
680 // Mark this entry as invalid
681 m_sizes[i][j] = UINT64_MAX;
682 m_union[i][j].clear();
683 m_union[i][j].shrink_to_fit();
684 }
685 }
686 }
687 }
688
689 // Choose the maximal set of fragments with cardinality in [min, max]
690 // with the minimum size
691 uint64_t min_size = UINT64_MAX;
692 size_t min_col = 0;
693 for (int i = row_num - 1; (i >= 0 && i >= (int)min - 1); --i) {
694 min_size = UINT64_MAX;
695 for (size_t j = 0; j < col_num; ++j) {
696 // Update the min size if the new size is more than 25% smaller.
697 // This is to give preference to earlier fragment sets, in case
698 // the user writes in *approximately* equal batches. Otherwise,
699 // fragment sets in the middle of the timeline may get consolidated,
700 // which will hinder the next step of consolidation (which will
701 // select some small and some big fragments).
702 if (min_size == UINT64_MAX || m_sizes[i][j] < (min_size / 1.25)) {
703 min_size = m_sizes[i][j];
704 min_col = j;
705 }
706 }
707
708 // Results not found
709 if (min_size == UINT64_MAX)
710 continue;
711
712 // Results found
713 for (size_t f = min_col; f <= min_col + i; ++f) {
714 to_consolidate->emplace_back(
715 fragments[f].uri(), fragments[f].timestamp_range());
716 }
717 *union_non_empty_domains = m_union[i][min_col];
718 break;
719 }
720
721 return Status::Ok();
722 }
723
compute_new_fragment_uri(const URI & first,const URI & last,uint32_t format_version,URI * new_uri) const724 Status Consolidator::compute_new_fragment_uri(
725 const URI& first,
726 const URI& last,
727 uint32_t format_version,
728 URI* new_uri) const {
729 // Get uuid
730 std::string uuid;
731 RETURN_NOT_OK(uuid::generate_uuid(&uuid, false));
732
733 // For creating the new fragment URI
734
735 // Get timestamp ranges
736 std::pair<uint64_t, uint64_t> t_first, t_last;
737 RETURN_NOT_OK(utils::parse::get_timestamp_range(first, &t_first));
738 RETURN_NOT_OK(utils::parse::get_timestamp_range(last, &t_last));
739
740 // Create new URI
741 std::stringstream ss;
742 ss << first.parent().to_string() << "/__" << t_first.first << "_"
743 << t_last.second << "_" << uuid << "_" << format_version;
744
745 *new_uri = URI(ss.str());
746
747 return Status::Ok();
748 }
749
set_query_buffers(Query * query,std::vector<ByteVec> * buffers,std::vector<uint64_t> * buffer_sizes) const750 Status Consolidator::set_query_buffers(
751 Query* query,
752 std::vector<ByteVec>* buffers,
753 std::vector<uint64_t>* buffer_sizes) const {
754 auto array_schema = query->array_schema();
755 auto dim_num = array_schema->dim_num();
756 auto dense = array_schema->dense();
757 auto attributes = array_schema->attributes();
758 unsigned bid = 0;
759 for (const auto& attr : attributes) {
760 if (!attr->var_size()) {
761 if (!attr->nullable()) {
762 RETURN_NOT_OK(query->set_data_buffer(
763 attr->name(), (void*)&(*buffers)[bid][0], &(*buffer_sizes)[bid]));
764 ++bid;
765 } else {
766 RETURN_NOT_OK(query->set_buffer_vbytemap(
767 attr->name(),
768 (void*)&(*buffers)[bid][0],
769 &(*buffer_sizes)[bid],
770 (uint8_t*)&(*buffers)[bid + 1][0],
771 &(*buffer_sizes)[bid + 1]));
772 bid += 2;
773 }
774 } else {
775 if (!attr->nullable()) {
776 RETURN_NOT_OK(query->set_data_buffer(
777 attr->name(),
778 (void*)&(*buffers)[bid + 1][0],
779 &(*buffer_sizes)[bid + 1]));
780 RETURN_NOT_OK(query->set_offsets_buffer(
781 attr->name(),
782 (uint64_t*)&(*buffers)[bid][0],
783 &(*buffer_sizes)[bid]));
784 bid += 2;
785 } else {
786 RETURN_NOT_OK(query->set_buffer_vbytemap(
787 attr->name(),
788 (uint64_t*)&(*buffers)[bid][0],
789 &(*buffer_sizes)[bid],
790 (void*)&(*buffers)[bid + 1][0],
791 &(*buffer_sizes)[bid + 1],
792 (uint8_t*)&(*buffers)[bid + 2][0],
793 &(*buffer_sizes)[bid + 2]));
794 bid += 3;
795 }
796 }
797 }
798 if (!dense) {
799 for (unsigned d = 0; d < dim_num; ++d) {
800 auto dim = array_schema->dimension(d);
801 auto dim_name = dim->name();
802 if (!dim->var_size()) {
803 RETURN_NOT_OK(query->set_data_buffer(
804 dim_name, (void*)&(*buffers)[bid][0], &(*buffer_sizes)[bid]));
805 ++bid;
806 } else {
807 RETURN_NOT_OK(query->set_data_buffer(
808 dim_name,
809 (void*)&(*buffers)[bid + 1][0],
810 &(*buffer_sizes)[bid + 1]));
811 RETURN_NOT_OK(query->set_offsets_buffer(
812 dim_name, (uint64_t*)&(*buffers)[bid][0], &(*buffer_sizes)[bid]));
813 bid += 2;
814 }
815 }
816 }
817
818 return Status::Ok();
819 }
820
update_fragment_info(const std::vector<TimestampedURI> & to_consolidate,const SingleFragmentInfo & new_fragment_info,FragmentInfo * fragment_info) const821 void Consolidator::update_fragment_info(
822 const std::vector<TimestampedURI>& to_consolidate,
823 const SingleFragmentInfo& new_fragment_info,
824 FragmentInfo* fragment_info) const {
825 auto to_consolidate_it = to_consolidate.begin();
826 auto fragment_it = fragment_info->fragments().begin();
827 FragmentInfo updated_fragment_info;
828 bool new_fragment_added = false;
829
830 while (fragment_it != fragment_info->fragments().end()) {
831 // No match - add the fragment info and advance `fragment_it`
832 if (to_consolidate_it == to_consolidate.end() ||
833 fragment_it->uri().to_string() != to_consolidate_it->uri_.to_string()) {
834 updated_fragment_info.append(*fragment_it);
835 ++fragment_it;
836 } else { // Match - add new fragment only once and advance both iterators
837 if (!new_fragment_added) {
838 updated_fragment_info.append(new_fragment_info);
839 new_fragment_added = true;
840 }
841 ++fragment_it;
842 ++to_consolidate_it;
843 }
844 }
845
846 assert(
847 updated_fragment_info.fragment_num() ==
848 fragment_info->fragment_num() - to_consolidate.size() + 1);
849
850 *fragment_info = std::move(updated_fragment_info);
851 }
852
set_config(const Config * config)853 Status Consolidator::set_config(const Config* config) {
854 // Set the config
855 Config merged_config = storage_manager_->config();
856 if (config)
857 merged_config.inherit(*config);
858 bool found = false;
859 config_.amplification_ = 0.0f;
860 RETURN_NOT_OK(merged_config.get<float>(
861 "sm.consolidation.amplification", &config_.amplification_, &found));
862 assert(found);
863 config_.steps_ = 0;
864 RETURN_NOT_OK(merged_config.get<uint32_t>(
865 "sm.consolidation.steps", &config_.steps_, &found));
866 assert(found);
867 config_.buffer_size_ = 0;
868 RETURN_NOT_OK(merged_config.get<uint64_t>(
869 "sm.consolidation.buffer_size", &config_.buffer_size_, &found));
870 assert(found);
871 config_.size_ratio_ = 0.0f;
872 RETURN_NOT_OK(merged_config.get<float>(
873 "sm.consolidation.step_size_ratio", &config_.size_ratio_, &found));
874 assert(found);
875 config_.min_frags_ = 0;
876 RETURN_NOT_OK(merged_config.get<uint32_t>(
877 "sm.consolidation.step_min_frags", &config_.min_frags_, &found));
878 assert(found);
879 config_.max_frags_ = 0;
880 RETURN_NOT_OK(merged_config.get<uint32_t>(
881 "sm.consolidation.step_max_frags", &config_.max_frags_, &found));
882 assert(found);
883 const std::string mode = merged_config.get("sm.consolidation.mode", &found);
884 if (!found)
885 return logger_->status(Status::ConsolidatorError(
886 "Cannot consolidate; Consolidation mode cannot be null"));
887 config_.mode_ = mode;
888 RETURN_NOT_OK(merged_config.get<uint64_t>(
889 "sm.consolidation.timestamp_start", &config_.timestamp_start_, &found));
890 assert(found);
891 RETURN_NOT_OK(merged_config.get<uint64_t>(
892 "sm.consolidation.timestamp_end", &config_.timestamp_end_, &found));
893 assert(found);
894 std::string reader =
895 merged_config.get("sm.query.sparse_global_order.reader", &found);
896 assert(found);
897 config_.use_refactored_reader_ = reader.compare("reafctored") == 0;
898
899 // Sanity checks
900 if (config_.min_frags_ > config_.max_frags_)
901 return logger_->status(Status::ConsolidatorError(
902 "Invalid configuration; Minimum fragments config parameter is larger "
903 "than the maximum"));
904 if (config_.size_ratio_ > 1.0f || config_.size_ratio_ < 0.0f)
905 return logger_->status(Status::ConsolidatorError(
906 "Invalid configuration; Step size ratio config parameter must be in "
907 "[0.0, 1.0]"));
908 if (config_.amplification_ < 0)
909 return logger_->status(
910 Status::ConsolidatorError("Invalid configuration; Amplification config "
911 "parameter must be non-negative"));
912
913 return Status::Ok();
914 }
915
write_vacuum_file(const URI & new_uri,const std::vector<TimestampedURI> & to_consolidate) const916 Status Consolidator::write_vacuum_file(
917 const URI& new_uri,
918 const std::vector<TimestampedURI>& to_consolidate) const {
919 URI vac_uri = URI(new_uri.to_string() + constants::vacuum_file_suffix);
920
921 std::stringstream ss;
922 for (const auto& timestampedURI : to_consolidate)
923 ss << timestampedURI.uri_.to_string() << "\n";
924
925 auto data = ss.str();
926 RETURN_NOT_OK(
927 storage_manager_->vfs()->write(vac_uri, data.c_str(), data.size()));
928 RETURN_NOT_OK(storage_manager_->vfs()->close_file(vac_uri));
929
930 return Status::Ok();
931 }
932
933 } // namespace sm
934 } // namespace tiledb
935