1 /**
2 * @file reader.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file implements class Reader.
31 */
32
33 #include "tiledb/sm/query/reader.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array/array.h"
36 #include "tiledb/sm/array_schema/array_schema.h"
37 #include "tiledb/sm/array_schema/dimension.h"
38 #include "tiledb/sm/fragment/fragment_metadata.h"
39 #include "tiledb/sm/misc/comparators.h"
40 #include "tiledb/sm/misc/hilbert.h"
41 #include "tiledb/sm/misc/parallel_functions.h"
42 #include "tiledb/sm/misc/utils.h"
43 #include "tiledb/sm/query/query_macros.h"
44 #include "tiledb/sm/query/read_cell_slab_iter.h"
45 #include "tiledb/sm/query/result_tile.h"
46 #include "tiledb/sm/stats/global_stats.h"
47 #include "tiledb/sm/storage_manager/storage_manager.h"
48 #include "tiledb/sm/subarray/cell_slab.h"
49 #include "tiledb/sm/tile/generic_tile_io.h"
50
51 using namespace tiledb;
52 using namespace tiledb::common;
53 using namespace tiledb::sm::stats;
54
55 namespace tiledb {
56 namespace sm {
57
58 namespace {
59 /**
60 * If the given iterator points to an "invalid" element, advance it until the
61 * pointed-to element is valid, or `end`. Validity is determined by calling
62 * `it->valid()`.
63 *
64 * Example:
65 *
66 * @code{.cpp}
67 * std::vector<T> vec = ...;
68 * // Get an iterator to the first valid vec element, or vec.end() if the
69 * // vector is empty or only contains invalid elements.
70 * auto it = skip_invalid_elements(vec.begin(), vec.end());
71 * // If there was a valid element, now advance the iterator to the next
72 * // valid element (or vec.end() if there are no more).
73 * it = skip_invalid_elements(++it, vec.end());
74 * @endcode
75 *
76 *
77 * @tparam IterT The iterator type
78 * @param it The iterator
79 * @param end The end iterator value
80 * @return Iterator pointing to a valid element, or `end`.
81 */
82 template <typename IterT>
skip_invalid_elements(IterT it,const IterT & end)83 inline IterT skip_invalid_elements(IterT it, const IterT& end) {
84 while (it != end && !it->valid()) {
85 ++it;
86 }
87 return it;
88 }
89 } // namespace
90
91 /* ****************************** */
92 /* CONSTRUCTORS */
93 /* ****************************** */
94
Reader(stats::Stats * stats,tdb_shared_ptr<Logger> logger,StorageManager * storage_manager,Array * array,Config & config,std::unordered_map<std::string,QueryBuffer> & buffers,Subarray & subarray,Layout layout,QueryCondition & condition)95 Reader::Reader(
96 stats::Stats* stats,
97 tdb_shared_ptr<Logger> logger,
98 StorageManager* storage_manager,
99 Array* array,
100 Config& config,
101 std::unordered_map<std::string, QueryBuffer>& buffers,
102 Subarray& subarray,
103 Layout layout,
104 QueryCondition& condition)
105 : ReaderBase(
106 stats,
107 logger->clone("Reader", ++logger_id_),
108 storage_manager,
109 array,
110 config,
111 buffers,
112 subarray,
113 layout,
114 condition) {
115 }
116
117 /* ****************************** */
118 /* API */
119 /* ****************************** */
120
finalize()121 Status Reader::finalize() {
122 return Status::Ok();
123 }
124
incomplete() const125 bool Reader::incomplete() const {
126 return read_state_.overflowed_ || !read_state_.done();
127 }
128
init()129 Status Reader::init() {
130 // Sanity checks
131 if (storage_manager_ == nullptr)
132 return logger_->status(Status::ReaderError(
133 "Cannot initialize reader; Storage manager not set"));
134 if (array_schema_ == nullptr)
135 return logger_->status(Status::ReaderError(
136 "Cannot initialize reader; Array metadata not set"));
137 if (buffers_.empty())
138 return logger_->status(
139 Status::ReaderError("Cannot initialize reader; Buffers not set"));
140 if (array_schema_->dense() && !subarray_.is_set())
141 return logger_->status(Status::ReaderError(
142 "Cannot initialize reader; Dense reads must have a subarray set"));
143
144 // Check subarray
145 RETURN_NOT_OK(check_subarray());
146
147 // Initialize the read state
148 RETURN_NOT_OK(init_read_state());
149
150 // Check the validity buffer sizes. This must be performed
151 // after `init_read_state` to ensure we have set the
152 // member state correctly from the config.
153 RETURN_NOT_OK(check_validity_buffer_sizes());
154
155 return Status::Ok();
156 }
157
initialize_memory_budget()158 Status Reader::initialize_memory_budget() {
159 return Status::Ok();
160 }
161
read_state() const162 const Reader::ReadState* Reader::read_state() const {
163 return &read_state_;
164 }
165
read_state()166 Reader::ReadState* Reader::read_state() {
167 return &read_state_;
168 }
169
complete_read_loop()170 Status Reader::complete_read_loop() {
171 if (offsets_extra_element_) {
172 RETURN_NOT_OK(add_extra_offset());
173 }
174
175 return Status::Ok();
176 }
177
dowork()178 Status Reader::dowork() {
179 // Check that the query condition is valid.
180 RETURN_NOT_OK(condition_.check(array_schema_));
181
182 get_dim_attr_stats();
183
184 auto timer_se = stats_->start_timer("read");
185
186 auto dense_mode = array_schema_->dense();
187
188 // Get next partition
189 if (!read_state_.unsplittable_)
190 RETURN_NOT_OK(read_state_.next());
191
192 // Handle empty array or empty/finished subarray
193 if (!dense_mode && fragment_metadata_.empty()) {
194 zero_out_buffer_sizes();
195 return Status::Ok();
196 }
197
198 // Loop until you find results, or unsplittable, or done
199 do {
200 stats_->add_counter("loop_num", 1);
201
202 read_state_.overflowed_ = false;
203 copy_overflowed_ = false;
204 reset_buffer_sizes();
205
206 // Perform read
207 if (dense_mode) {
208 RETURN_NOT_OK(dense_read());
209 } else {
210 RETURN_NOT_OK(sparse_read());
211 }
212
213 // In the case of overflow, we need to split the current partition
214 // without advancing to the next partition
215 if (read_state_.overflowed_) {
216 zero_out_buffer_sizes();
217 RETURN_NOT_OK(read_state_.split_current());
218
219 if (read_state_.unsplittable_) {
220 return complete_read_loop();
221 }
222 } else {
223 bool has_results = false;
224 for (const auto& it : buffers_) {
225 if (*(it.second.buffer_size_) != 0)
226 has_results = true;
227 }
228
229 // Need to reset unsplittable if the results fit after all
230 if (has_results)
231 read_state_.unsplittable_ = false;
232
233 if (has_results || read_state_.done()) {
234 return complete_read_loop();
235 }
236
237 RETURN_NOT_OK(read_state_.next());
238 }
239 } while (true);
240
241 return Status::Ok();
242 }
243
reset()244 void Reader::reset() {
245 }
246
247 /* ****************************** */
248 /* PRIVATE METHODS */
249 /* ****************************** */
250
compute_result_cell_slabs(const std::vector<ResultCoords> & result_coords,std::vector<ResultCellSlab> * result_cell_slabs) const251 Status Reader::compute_result_cell_slabs(
252 const std::vector<ResultCoords>& result_coords,
253 std::vector<ResultCellSlab>* result_cell_slabs) const {
254 auto timer_se =
255 stats_->start_timer("compute_sparse_result_cell_slabs_sparse");
256
257 // Trivial case
258 auto coords_num = (uint64_t)result_coords.size();
259 if (coords_num == 0)
260 return Status::Ok();
261
262 // Initialize the first range
263 auto coords_end = result_coords.end();
264 auto it = skip_invalid_elements(result_coords.begin(), coords_end);
265 if (it == coords_end) {
266 return logger_->status(Status::ReaderError("Unexpected empty cell range."));
267 }
268 uint64_t start_pos = it->pos_;
269 uint64_t end_pos = start_pos;
270 ResultTile* tile = it->tile_;
271
272 // Scan the coordinates and compute ranges
273 it = skip_invalid_elements(++it, coords_end);
274 while (it != coords_end) {
275 if (it->tile_ == tile && it->pos_ == end_pos + 1) {
276 // Same range - advance end position
277 end_pos = it->pos_;
278 } else {
279 // New range - append previous range
280 result_cell_slabs->emplace_back(tile, start_pos, end_pos - start_pos + 1);
281 start_pos = it->pos_;
282 end_pos = start_pos;
283 tile = it->tile_;
284 }
285 it = skip_invalid_elements(++it, coords_end);
286 }
287
288 // Append the last range
289 result_cell_slabs->emplace_back(tile, start_pos, end_pos - start_pos + 1);
290
291 return Status::Ok();
292 }
293
compute_range_result_coords(Subarray * subarray,unsigned frag_idx,ResultTile * tile,uint64_t range_idx,std::vector<ResultCoords> * result_coords)294 Status Reader::compute_range_result_coords(
295 Subarray* subarray,
296 unsigned frag_idx,
297 ResultTile* tile,
298 uint64_t range_idx,
299 std::vector<ResultCoords>* result_coords) {
300 auto coords_num = tile->cell_num();
301 auto dim_num = array_schema_->dim_num();
302 auto cell_order = array_schema_->cell_order();
303 auto range_coords = subarray->get_range_coords(range_idx);
304
305 if (array_schema_->dense()) {
306 std::vector<uint8_t> result_bitmap(coords_num, 1);
307 std::vector<uint8_t> overwritten_bitmap(coords_num, 0);
308
309 // Compute result and overwritten bitmap per dimension
310 for (unsigned d = 0; d < dim_num; ++d) {
311 const auto& ranges = subarray->ranges_for_dim(d);
312 RETURN_NOT_OK(tile->compute_results_dense(
313 d,
314 ranges[range_coords[d]],
315 fragment_metadata_,
316 frag_idx,
317 &result_bitmap,
318 &overwritten_bitmap));
319 }
320
321 // Gather results
322 for (uint64_t pos = 0; pos < coords_num; ++pos) {
323 if (result_bitmap[pos] && !overwritten_bitmap[pos])
324 result_coords->emplace_back(tile, pos);
325 }
326 } else { // Sparse
327 std::vector<uint8_t> result_bitmap(coords_num, 1);
328
329 // Compute result and overwritten bitmap per dimension
330 for (unsigned d = 0; d < dim_num; ++d) {
331 // For col-major cell ordering, iterate the dimensions
332 // in reverse.
333 const unsigned dim_idx =
334 cell_order == Layout::COL_MAJOR ? dim_num - d - 1 : d;
335 if (!subarray->is_default(dim_idx)) {
336 const auto& ranges = subarray->ranges_for_dim(dim_idx);
337 RETURN_NOT_OK(tile->compute_results_sparse(
338 dim_idx,
339 ranges[range_coords[dim_idx]],
340 &result_bitmap,
341 cell_order));
342 }
343 }
344
345 // Gather results
346 for (uint64_t pos = 0; pos < coords_num; ++pos) {
347 if (result_bitmap[pos])
348 result_coords->emplace_back(tile, pos);
349 }
350 }
351
352 return Status::Ok();
353 }
354
compute_range_result_coords(Subarray * subarray,const std::vector<bool> & single_fragment,const std::map<std::pair<unsigned,uint64_t>,size_t> & result_tile_map,std::vector<ResultTile> * result_tiles,std::vector<std::vector<ResultCoords>> * range_result_coords)355 Status Reader::compute_range_result_coords(
356 Subarray* subarray,
357 const std::vector<bool>& single_fragment,
358 const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
359 std::vector<ResultTile>* result_tiles,
360 std::vector<std::vector<ResultCoords>>* range_result_coords) {
361 auto timer_se = stats_->start_timer("compute_range_result_coords");
362
363 auto range_num = subarray->range_num();
364 range_result_coords->resize(range_num);
365 auto cell_order = array_schema_->cell_order();
366 auto allows_dups = array_schema_->allows_dups();
367
368 // To de-dupe the ranges, we may need to sort them. If the
369 // read layout is UNORDERED, we will sort by the cell layout.
370 // If the cell layout is hilbert, we will sort in row-major to
371 // avoid the expense of calculating hilbert values.
372 Layout sort_layout = layout_;
373 if (sort_layout == Layout::UNORDERED) {
374 sort_layout = cell_order;
375 if (sort_layout == Layout::HILBERT) {
376 sort_layout = Layout::ROW_MAJOR;
377 }
378 }
379
380 auto status = parallel_for(
381 storage_manager_->compute_tp(), 0, range_num, [&](uint64_t r) {
382 // Compute overlapping coordinates per range
383 RETURN_NOT_OK(compute_range_result_coords(
384 subarray,
385 r,
386 result_tile_map,
387 result_tiles,
388 &((*range_result_coords)[r])));
389
390 // Dedup unless there is a single fragment or array schema allows
391 // duplicates
392 if (!single_fragment[r] && !allows_dups) {
393 RETURN_CANCEL_OR_ERROR(sort_result_coords(
394 ((*range_result_coords)[r]).begin(),
395 ((*range_result_coords)[r]).end(),
396 ((*range_result_coords)[r]).size(),
397 sort_layout));
398 RETURN_CANCEL_OR_ERROR(
399 dedup_result_coords(&((*range_result_coords)[r])));
400 }
401
402 return Status::Ok();
403 });
404
405 RETURN_NOT_OK(status);
406
407 return Status::Ok();
408 }
409
compute_range_result_coords(Subarray * subarray,uint64_t range_idx,uint32_t fragment_idx,const std::map<std::pair<unsigned,uint64_t>,size_t> & result_tile_map,std::vector<ResultTile> * result_tiles,std::vector<ResultCoords> * range_result_coords)410 Status Reader::compute_range_result_coords(
411 Subarray* subarray,
412 uint64_t range_idx,
413 uint32_t fragment_idx,
414 const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
415 std::vector<ResultTile>* result_tiles,
416 std::vector<ResultCoords>* range_result_coords) {
417 // Skip dense fragments
418 if (fragment_metadata_[fragment_idx]->dense())
419 return Status::Ok();
420
421 auto tr =
422 subarray->tile_overlap(fragment_idx, range_idx)->tile_ranges_.begin();
423 auto tr_end =
424 subarray->tile_overlap(fragment_idx, range_idx)->tile_ranges_.end();
425 auto t = subarray->tile_overlap(fragment_idx, range_idx)->tiles_.begin();
426 auto t_end = subarray->tile_overlap(fragment_idx, range_idx)->tiles_.end();
427
428 while (tr != tr_end || t != t_end) {
429 // Handle tile range
430 if (tr != tr_end && (t == t_end || tr->first < t->first)) {
431 for (uint64_t i = tr->first; i <= tr->second; ++i) {
432 auto pair = std::pair<unsigned, uint64_t>(fragment_idx, i);
433 auto tile_it = result_tile_map.find(pair);
434 assert(tile_it != result_tile_map.end());
435 auto tile_idx = tile_it->second;
436 auto& tile = (*result_tiles)[tile_idx];
437
438 // Add results only if the sparse tile MBR is not fully
439 // covered by a more recent fragment's non-empty domain
440 if (!sparse_tile_overwritten(fragment_idx, i))
441 RETURN_NOT_OK(get_all_result_coords(&tile, range_result_coords));
442 }
443 ++tr;
444 } else {
445 // Handle single tile
446 auto pair = std::pair<unsigned, uint64_t>(fragment_idx, t->first);
447 auto tile_it = result_tile_map.find(pair);
448 assert(tile_it != result_tile_map.end());
449 auto tile_idx = tile_it->second;
450 auto& tile = (*result_tiles)[tile_idx];
451 if (t->second == 1.0) { // Full overlap
452 // Add results only if the sparse tile MBR is not fully
453 // covered by a more recent fragment's non-empty domain
454 if (!sparse_tile_overwritten(fragment_idx, t->first))
455 RETURN_NOT_OK(get_all_result_coords(&tile, range_result_coords));
456 } else { // Partial overlap
457 RETURN_NOT_OK(compute_range_result_coords(
458 subarray, fragment_idx, &tile, range_idx, range_result_coords));
459 }
460 ++t;
461 }
462 }
463
464 return Status::Ok();
465 }
466
compute_range_result_coords(Subarray * subarray,uint64_t range_idx,const std::map<std::pair<unsigned,uint64_t>,size_t> & result_tile_map,std::vector<ResultTile> * result_tiles,std::vector<ResultCoords> * range_result_coords)467 Status Reader::compute_range_result_coords(
468 Subarray* subarray,
469 uint64_t range_idx,
470 const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
471 std::vector<ResultTile>* result_tiles,
472 std::vector<ResultCoords>* range_result_coords) {
473 // Gather result range coordinates per fragment
474 auto fragment_num = fragment_metadata_.size();
475 std::vector<std::vector<ResultCoords>> range_result_coords_vec(fragment_num);
476 auto status = parallel_for(
477 storage_manager_->compute_tp(), 0, fragment_num, [&](uint32_t f) {
478 return compute_range_result_coords(
479 subarray,
480 range_idx,
481 f,
482 result_tile_map,
483 result_tiles,
484 &range_result_coords_vec[f]);
485 });
486 RETURN_NOT_OK(status);
487
488 // Consolidate the result coordinates in the single result vector
489 for (const auto& vec : range_result_coords_vec) {
490 for (const auto& r : vec)
491 range_result_coords->emplace_back(r);
492 }
493
494 return Status::Ok();
495 }
496
compute_subarray_coords(std::vector<std::vector<ResultCoords>> * range_result_coords,std::vector<ResultCoords> * result_coords)497 Status Reader::compute_subarray_coords(
498 std::vector<std::vector<ResultCoords>>* range_result_coords,
499 std::vector<ResultCoords>* result_coords) {
500 auto timer_se = stats_->start_timer("compute_subarray_coords");
501 // The input 'result_coords' is already sorted. Save the current size
502 // before inserting new elements.
503 const size_t result_coords_size = result_coords->size();
504
505 // Add all valid ``range_result_coords`` to ``result_coords``
506 for (const auto& rv : *range_result_coords) {
507 for (const auto& c : rv) {
508 if (c.valid())
509 result_coords->emplace_back(c.tile_, c.pos_);
510 }
511 }
512
513 // No need to sort in UNORDERED layout
514 if (layout_ == Layout::UNORDERED)
515 return Status::Ok();
516
517 // We should not sort if:
518 // - there is a single fragment and global order
519 // - there is a single fragment and one dimension
520 // - there are multiple fragments and a single range and dups are not allowed
521 // (therefore, the coords in that range have already been sorted)
522 auto dim_num = array_schema_->dim_num();
523 bool must_sort = true;
524 auto allows_dups = array_schema_->allows_dups();
525 auto single_range = (range_result_coords->size() == 1);
526 if (layout_ == Layout::GLOBAL_ORDER || dim_num == 1) {
527 must_sort = !belong_to_single_fragment(
528 result_coords->begin() + result_coords_size, result_coords->end());
529 } else if (single_range && !allows_dups) {
530 must_sort = belong_to_single_fragment(
531 result_coords->begin() + result_coords_size, result_coords->end());
532 }
533
534 if (must_sort) {
535 RETURN_NOT_OK(sort_result_coords(
536 result_coords->begin() + result_coords_size,
537 result_coords->end(),
538 result_coords->size() - result_coords_size,
539 layout_));
540 }
541
542 return Status::Ok();
543 }
544
compute_sparse_result_tiles(std::vector<ResultTile> * result_tiles,std::map<std::pair<unsigned,uint64_t>,size_t> * result_tile_map,std::vector<bool> * single_fragment)545 Status Reader::compute_sparse_result_tiles(
546 std::vector<ResultTile>* result_tiles,
547 std::map<std::pair<unsigned, uint64_t>, size_t>* result_tile_map,
548 std::vector<bool>* single_fragment) {
549 auto timer_se = stats_->start_timer("compute_sparse_result_tiles");
550
551 // For easy reference
552 auto domain = array_schema_->domain();
553 auto& partitioner = read_state_.partitioner_;
554 const auto& subarray = partitioner.current();
555 auto range_num = subarray.range_num();
556 auto fragment_num = fragment_metadata_.size();
557 std::vector<unsigned> first_fragment;
558 first_fragment.resize(range_num);
559 for (uint64_t r = 0; r < range_num; ++r)
560 first_fragment[r] = UINT32_MAX;
561
562 single_fragment->resize(range_num);
563 for (uint64_t i = 0; i < range_num; ++i)
564 (*single_fragment)[i] = true;
565
566 result_tiles->clear();
567 for (unsigned f = 0; f < fragment_num; ++f) {
568 // Skip dense fragments
569 if (fragment_metadata_[f]->dense())
570 continue;
571
572 for (uint64_t r = 0; r < range_num; ++r) {
573 // Handle range of tiles (full overlap)
574 const auto& tile_ranges = subarray.tile_overlap(f, r)->tile_ranges_;
575 for (const auto& tr : tile_ranges) {
576 for (uint64_t t = tr.first; t <= tr.second; ++t) {
577 auto pair = std::pair<unsigned, uint64_t>(f, t);
578 // Add tile only if it does not already exist
579 if (result_tile_map->find(pair) == result_tile_map->end()) {
580 result_tiles->emplace_back(f, t, domain);
581 (*result_tile_map)[pair] = result_tiles->size() - 1;
582 }
583 // Always check range for multiple fragments
584 if (f > first_fragment[r])
585 (*single_fragment)[r] = false;
586 else
587 first_fragment[r] = f;
588 }
589 }
590
591 // Handle single tiles
592 const auto& o_tiles = subarray.tile_overlap(f, r)->tiles_;
593 for (const auto& o_tile : o_tiles) {
594 auto t = o_tile.first;
595 auto pair = std::pair<unsigned, uint64_t>(f, t);
596 // Add tile only if it does not already exist
597 if (result_tile_map->find(pair) == result_tile_map->end()) {
598 result_tiles->emplace_back(f, t, domain);
599 (*result_tile_map)[pair] = result_tiles->size() - 1;
600 }
601 // Always check range for multiple fragments
602 if (f > first_fragment[r])
603 (*single_fragment)[r] = false;
604 else
605 first_fragment[r] = f;
606 }
607 }
608 }
609
610 return Status::Ok();
611 }
612
613 template <class T>
compute_result_cell_slabs(const Subarray & subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles,std::vector<ResultCoords> * result_coords,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs) const614 Status Reader::compute_result_cell_slabs(
615 const Subarray& subarray,
616 std::map<const T*, ResultSpaceTile<T>>* result_space_tiles,
617 std::vector<ResultCoords>* result_coords,
618 std::vector<ResultTile*>* result_tiles,
619 std::vector<ResultCellSlab>* result_cell_slabs) const {
620 auto timer_se = stats_->start_timer("compute_sparse_result_cell_slabs_dense");
621
622 auto layout = subarray.layout();
623 if (layout == Layout::ROW_MAJOR || layout == Layout::COL_MAJOR) {
624 uint64_t result_coords_pos = 0;
625 std::set<std::pair<unsigned, uint64_t>> frag_tile_set;
626 return compute_result_cell_slabs_row_col<T>(
627 subarray,
628 result_space_tiles,
629 result_coords,
630 &result_coords_pos,
631 result_tiles,
632 &frag_tile_set,
633 result_cell_slabs);
634 } else if (layout == Layout::GLOBAL_ORDER) {
635 return compute_result_cell_slabs_global<T>(
636 subarray,
637 result_space_tiles,
638 result_coords,
639 result_tiles,
640 result_cell_slabs);
641 } else { // UNORDERED
642 assert(false);
643 }
644
645 return Status::Ok();
646 }
647
648 template <class T>
compute_result_cell_slabs_row_col(const Subarray & subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles,std::vector<ResultCoords> * result_coords,uint64_t * result_coords_pos,std::vector<ResultTile * > * result_tiles,std::set<std::pair<unsigned,uint64_t>> * frag_tile_set,std::vector<ResultCellSlab> * result_cell_slabs) const649 Status Reader::compute_result_cell_slabs_row_col(
650 const Subarray& subarray,
651 std::map<const T*, ResultSpaceTile<T>>* result_space_tiles,
652 std::vector<ResultCoords>* result_coords,
653 uint64_t* result_coords_pos,
654 std::vector<ResultTile*>* result_tiles,
655 std::set<std::pair<unsigned, uint64_t>>* frag_tile_set,
656 std::vector<ResultCellSlab>* result_cell_slabs) const {
657 // Compute result space tiles. The result space tiles hold all the
658 // relevant result tiles of the dense fragments
659 compute_result_space_tiles<T>(
660 &subarray, read_state_.partitioner_.subarray(), result_space_tiles);
661
662 // Gather result cell slabs and pointers to result tiles
663 // `result_tiles` holds pointers to tiles that store actual results,
664 // which can be stored either in `sparse_result_tiles` (sparse)
665 // or in `result_space_tiles` (dense).
666 auto rcs_it = ReadCellSlabIter<T>(
667 &subarray, result_space_tiles, result_coords, *result_coords_pos);
668 for (rcs_it.begin(); !rcs_it.end(); ++rcs_it) {
669 // Add result cell slab
670 auto result_cell_slab = rcs_it.result_cell_slab();
671 result_cell_slabs->push_back(result_cell_slab);
672 // Add result tile
673 if (result_cell_slab.tile_ != nullptr) {
674 auto frag_idx = result_cell_slab.tile_->frag_idx();
675 auto tile_idx = result_cell_slab.tile_->tile_idx();
676 auto frag_tile_tuple = std::pair<unsigned, uint64_t>(frag_idx, tile_idx);
677 auto it = frag_tile_set->find(frag_tile_tuple);
678 if (it == frag_tile_set->end()) {
679 frag_tile_set->insert(frag_tile_tuple);
680 result_tiles->push_back(result_cell_slab.tile_);
681 }
682 }
683 }
684 *result_coords_pos = rcs_it.result_coords_pos();
685
686 return Status::Ok();
687 }
688
689 template <class T>
compute_result_cell_slabs_global(const Subarray & subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles,std::vector<ResultCoords> * result_coords,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs) const690 Status Reader::compute_result_cell_slabs_global(
691 const Subarray& subarray,
692 std::map<const T*, ResultSpaceTile<T>>* result_space_tiles,
693 std::vector<ResultCoords>* result_coords,
694 std::vector<ResultTile*>* result_tiles,
695 std::vector<ResultCellSlab>* result_cell_slabs) const {
696 const auto& tile_coords = subarray.tile_coords();
697 auto cell_order = array_schema_->cell_order();
698 std::vector<Subarray> tile_subarrays;
699 tile_subarrays.reserve(tile_coords.size());
700 uint64_t result_coords_pos = 0;
701 std::set<std::pair<unsigned, uint64_t>> frag_tile_set;
702
703 for (const auto& tc : tile_coords) {
704 tile_subarrays.emplace_back(
705 subarray.crop_to_tile((const T*)&tc[0], cell_order));
706 auto& tile_subarray = tile_subarrays.back();
707 tile_subarray.template compute_tile_coords<T>();
708
709 RETURN_NOT_OK(compute_result_cell_slabs_row_col<T>(
710 tile_subarray,
711 result_space_tiles,
712 result_coords,
713 &result_coords_pos,
714 result_tiles,
715 &frag_tile_set,
716 result_cell_slabs));
717 }
718
719 return Status::Ok();
720 }
721
compute_result_coords(std::vector<ResultTile> * result_tiles,std::vector<ResultCoords> * result_coords)722 Status Reader::compute_result_coords(
723 std::vector<ResultTile>* result_tiles,
724 std::vector<ResultCoords>* result_coords) {
725 auto timer_se = stats_->start_timer("compute_result_coords");
726
727 // Get overlapping tile indexes
728 typedef std::pair<unsigned, uint64_t> FragTileTuple;
729 std::map<FragTileTuple, size_t> result_tile_map;
730 std::vector<bool> single_fragment;
731
732 RETURN_CANCEL_OR_ERROR(compute_sparse_result_tiles(
733 result_tiles, &result_tile_map, &single_fragment));
734
735 if (result_tiles->empty())
736 return Status::Ok();
737
738 // Create temporary vector with pointers to result tiles, so that
739 // `read_tiles`, `unfilter_tiles` below can work without changes
740 std::vector<ResultTile*> tmp_result_tiles;
741 for (auto& result_tile : *result_tiles)
742 tmp_result_tiles.push_back(&result_tile);
743
744 // Preload zipped coordinate tile offsets. Note that this will
745 // ignore fragments with a version >= 5.
746 auto& subarray = read_state_.partitioner_.current();
747 std::vector<std::string> zipped_coords_names = {constants::coords};
748 RETURN_CANCEL_OR_ERROR(load_tile_offsets(
749 read_state_.partitioner_.subarray(), &zipped_coords_names));
750
751 // Preload unzipped coordinate tile offsets. Note that this will
752 // ignore fragments with a version < 5.
753 const auto dim_num = array_schema_->dim_num();
754 std::vector<std::string> dim_names;
755 dim_names.reserve(dim_num);
756 for (unsigned d = 0; d < dim_num; ++d)
757 dim_names.emplace_back(array_schema_->dimension(d)->name());
758 RETURN_CANCEL_OR_ERROR(
759 load_tile_offsets(read_state_.partitioner_.subarray(), &dim_names));
760
761 // Read and unfilter zipped coordinate tiles. Note that
762 // this will ignore fragments with a version >= 5.
763 RETURN_CANCEL_OR_ERROR(
764 read_coordinate_tiles(&zipped_coords_names, &tmp_result_tiles));
765 RETURN_CANCEL_OR_ERROR(unfilter_tiles(constants::coords, &tmp_result_tiles));
766
767 // Read and unfilter unzipped coordinate tiles. Note that
768 // this will ignore fragments with a version < 5.
769 RETURN_CANCEL_OR_ERROR(read_coordinate_tiles(&dim_names, &tmp_result_tiles));
770 for (const auto& dim_name : dim_names) {
771 RETURN_CANCEL_OR_ERROR(unfilter_tiles(dim_name, &tmp_result_tiles));
772 }
773
774 // Compute the read coordinates for all fragments for each subarray range.
775 std::vector<std::vector<ResultCoords>> range_result_coords;
776 RETURN_CANCEL_OR_ERROR(compute_range_result_coords(
777 &subarray,
778 single_fragment,
779 result_tile_map,
780 result_tiles,
781 &range_result_coords));
782 result_tile_map.clear();
783
784 // Compute final coords (sorted in the result layout) of the whole subarray.
785 RETURN_CANCEL_OR_ERROR(
786 compute_subarray_coords(&range_result_coords, result_coords));
787 range_result_coords.clear();
788
789 return Status::Ok();
790 }
791
dedup_result_coords(std::vector<ResultCoords> * result_coords) const792 Status Reader::dedup_result_coords(
793 std::vector<ResultCoords>* result_coords) const {
794 auto coords_end = result_coords->end();
795 auto it = skip_invalid_elements(result_coords->begin(), coords_end);
796 while (it != coords_end) {
797 auto next_it = skip_invalid_elements(std::next(it), coords_end);
798 if (next_it != coords_end && it->same_coords(*next_it)) {
799 if (it->tile_->frag_idx() < next_it->tile_->frag_idx()) {
800 it->invalidate();
801 it = skip_invalid_elements(++it, coords_end);
802 } else {
803 next_it->invalidate();
804 }
805 } else {
806 it = skip_invalid_elements(++it, coords_end);
807 }
808 }
809 return Status::Ok();
810 }
811
dense_read()812 Status Reader::dense_read() {
813 auto type = array_schema_->domain()->dimension(0)->type();
814 switch (type) {
815 case Datatype::INT8:
816 return dense_read<int8_t>();
817 case Datatype::UINT8:
818 return dense_read<uint8_t>();
819 case Datatype::INT16:
820 return dense_read<int16_t>();
821 case Datatype::UINT16:
822 return dense_read<uint16_t>();
823 case Datatype::INT32:
824 return dense_read<int>();
825 case Datatype::UINT32:
826 return dense_read<unsigned>();
827 case Datatype::INT64:
828 return dense_read<int64_t>();
829 case Datatype::UINT64:
830 return dense_read<uint64_t>();
831 case Datatype::DATETIME_YEAR:
832 case Datatype::DATETIME_MONTH:
833 case Datatype::DATETIME_WEEK:
834 case Datatype::DATETIME_DAY:
835 case Datatype::DATETIME_HR:
836 case Datatype::DATETIME_MIN:
837 case Datatype::DATETIME_SEC:
838 case Datatype::DATETIME_MS:
839 case Datatype::DATETIME_US:
840 case Datatype::DATETIME_NS:
841 case Datatype::DATETIME_PS:
842 case Datatype::DATETIME_FS:
843 case Datatype::DATETIME_AS:
844 case Datatype::TIME_HR:
845 case Datatype::TIME_MIN:
846 case Datatype::TIME_SEC:
847 case Datatype::TIME_MS:
848 case Datatype::TIME_US:
849 case Datatype::TIME_NS:
850 case Datatype::TIME_PS:
851 case Datatype::TIME_FS:
852 case Datatype::TIME_AS:
853 return dense_read<int64_t>();
854 default:
855 return logger_->status(Status::ReaderError(
856 "Cannot read dense array; Unsupported domain type"));
857 }
858
859 return Status::Ok();
860 }
861
862 template <class T>
dense_read()863 Status Reader::dense_read() {
864 // Sanity checks
865 assert(std::is_integral<T>::value);
866
867 // Compute result coordinates from the sparse fragments
868 // `sparse_result_tiles` will hold all the relevant result tiles of
869 // sparse fragments
870 std::vector<ResultCoords> result_coords;
871 std::vector<ResultTile> sparse_result_tiles;
872 RETURN_NOT_OK(compute_result_coords(&sparse_result_tiles, &result_coords));
873
874 // Compute result cell slabs.
875 // `result_space_tiles` will hold all the relevant result tiles of
876 // dense fragments. `result` tiles will hold pointers to the
877 // final result tiles for both sparse and dense fragments.
878 std::map<const T*, ResultSpaceTile<T>> result_space_tiles;
879 std::vector<ResultCellSlab> result_cell_slabs;
880 std::vector<ResultTile*> result_tiles;
881 auto& subarray = read_state_.partitioner_.current();
882
883 RETURN_NOT_OK(subarray.compute_tile_coords<T>());
884 RETURN_NOT_OK(compute_result_cell_slabs<T>(
885 subarray,
886 &result_space_tiles,
887 &result_coords,
888 &result_tiles,
889 &result_cell_slabs));
890
891 auto stride = array_schema_->domain()->stride<T>(subarray.layout());
892 RETURN_NOT_OK(apply_query_condition(
893 &result_cell_slabs,
894 &result_tiles,
895 read_state_.partitioner_.subarray(),
896 stride));
897
898 get_result_tile_stats(result_tiles);
899 get_result_cell_stats(result_cell_slabs);
900
901 // Clear sparse coordinate tiles (not needed any more)
902 erase_coord_tiles(&sparse_result_tiles);
903
904 // Needed when copying the cells
905 RETURN_NOT_OK(copy_attribute_values(
906 stride,
907 &result_tiles,
908 &result_cell_slabs,
909 *read_state_.partitioner_.subarray()));
910 read_state_.overflowed_ = copy_overflowed_;
911
912 // Fill coordinates if the user requested them
913 if (!read_state_.overflowed_ && has_coords())
914 RETURN_CANCEL_OR_ERROR(fill_dense_coords<T>(subarray));
915 read_state_.overflowed_ = copy_overflowed_;
916
917 return Status::Ok();
918 }
919
get_all_result_coords(ResultTile * tile,std::vector<ResultCoords> * result_coords) const920 Status Reader::get_all_result_coords(
921 ResultTile* tile, std::vector<ResultCoords>* result_coords) const {
922 auto coords_num = tile->cell_num();
923 for (uint64_t i = 0; i < coords_num; ++i)
924 result_coords->emplace_back(tile, i);
925
926 return Status::Ok();
927 }
928
has_separate_coords() const929 bool Reader::has_separate_coords() const {
930 for (const auto& it : buffers_) {
931 if (array_schema_->is_dim(it.first))
932 return true;
933 }
934
935 return false;
936 }
937
init_read_state()938 Status Reader::init_read_state() {
939 auto timer_se = stats_->start_timer("init_state");
940
941 // Check subarray
942 if (subarray_.layout() == Layout::GLOBAL_ORDER && subarray_.range_num() != 1)
943 return logger_->status(
944 Status::ReaderError("Cannot initialize read "
945 "state; Multi-range "
946 "subarrays do not "
947 "support global order"));
948
949 // Get config
950 bool found = false;
951 uint64_t memory_budget = 0;
952 RETURN_NOT_OK(
953 config_.get<uint64_t>("sm.memory_budget", &memory_budget, &found));
954 assert(found);
955 uint64_t memory_budget_var = 0;
956 RETURN_NOT_OK(config_.get<uint64_t>(
957 "sm.memory_budget_var", &memory_budget_var, &found));
958 assert(found);
959 offsets_format_mode_ = config_.get("sm.var_offsets.mode", &found);
960 assert(found);
961 if (offsets_format_mode_ != "bytes" && offsets_format_mode_ != "elements") {
962 return logger_->status(
963 Status::ReaderError("Cannot initialize reader; Unsupported offsets "
964 "format in configuration"));
965 }
966 RETURN_NOT_OK(config_.get<bool>(
967 "sm.var_offsets.extra_element", &offsets_extra_element_, &found));
968 assert(found);
969 RETURN_NOT_OK(config_.get<uint32_t>(
970 "sm.var_offsets.bitsize", &offsets_bitsize_, &found));
971 if (offsets_bitsize_ != 32 && offsets_bitsize_ != 64) {
972 return logger_->status(
973 Status::ReaderError("Cannot initialize reader; Unsupported offsets "
974 "bitsize in configuration"));
975 }
976 assert(found);
977
978 // Consider the validity memory budget to be identical to `sm.memory_budget`
979 // because the validity vector is currently a bytemap. When converted to a
980 // bitmap, this can be budgeted as `sm.memory_budget` / 8
981 uint64_t memory_budget_validity = memory_budget;
982
983 // Create read state
984 read_state_.partitioner_ = SubarrayPartitioner(
985 &config_,
986 subarray_,
987 memory_budget,
988 memory_budget_var,
989 memory_budget_validity,
990 storage_manager_->compute_tp(),
991 stats_,
992 logger_);
993 read_state_.overflowed_ = false;
994 read_state_.unsplittable_ = false;
995
996 // Set result size budget
997 for (const auto& a : buffers_) {
998 auto attr_name = a.first;
999 auto buffer_size = a.second.buffer_size_;
1000 auto buffer_var_size = a.second.buffer_var_size_;
1001 auto buffer_validity_size = a.second.validity_vector_.buffer_size();
1002 if (!array_schema_->var_size(attr_name)) {
1003 if (!array_schema_->is_nullable(attr_name)) {
1004 RETURN_NOT_OK(read_state_.partitioner_.set_result_budget(
1005 attr_name.c_str(), *buffer_size));
1006 } else {
1007 RETURN_NOT_OK(read_state_.partitioner_.set_result_budget_nullable(
1008 attr_name.c_str(), *buffer_size, *buffer_validity_size));
1009 }
1010 } else {
1011 if (!array_schema_->is_nullable(attr_name)) {
1012 RETURN_NOT_OK(read_state_.partitioner_.set_result_budget(
1013 attr_name.c_str(), *buffer_size, *buffer_var_size));
1014 } else {
1015 RETURN_NOT_OK(read_state_.partitioner_.set_result_budget_nullable(
1016 attr_name.c_str(),
1017 *buffer_size,
1018 *buffer_var_size,
1019 *buffer_validity_size));
1020 }
1021 }
1022 }
1023
1024 read_state_.unsplittable_ = false;
1025 read_state_.overflowed_ = false;
1026 copy_overflowed_ = false;
1027 read_state_.initialized_ = true;
1028
1029 return Status::Ok();
1030 }
1031
sort_result_coords(std::vector<ResultCoords>::iterator iter_begin,std::vector<ResultCoords>::iterator iter_end,size_t coords_num,Layout layout) const1032 Status Reader::sort_result_coords(
1033 std::vector<ResultCoords>::iterator iter_begin,
1034 std::vector<ResultCoords>::iterator iter_end,
1035 size_t coords_num,
1036 Layout layout) const {
1037 auto timer_se = stats_->start_timer("sort_result_coords");
1038 auto domain = array_schema_->domain();
1039
1040 if (layout == Layout::ROW_MAJOR) {
1041 parallel_sort(
1042 storage_manager_->compute_tp(), iter_begin, iter_end, RowCmp(domain));
1043 } else if (layout == Layout::COL_MAJOR) {
1044 parallel_sort(
1045 storage_manager_->compute_tp(), iter_begin, iter_end, ColCmp(domain));
1046 } else if (layout == Layout::GLOBAL_ORDER) {
1047 if (array_schema_->cell_order() == Layout::HILBERT) {
1048 std::vector<std::pair<uint64_t, uint64_t>> hilbert_values(coords_num);
1049 RETURN_NOT_OK(calculate_hilbert_values(iter_begin, &hilbert_values));
1050 parallel_sort(
1051 storage_manager_->compute_tp(),
1052 hilbert_values.begin(),
1053 hilbert_values.end(),
1054 HilbertCmp(domain, iter_begin));
1055 RETURN_NOT_OK(reorganize_result_coords(iter_begin, &hilbert_values));
1056 } else {
1057 parallel_sort(
1058 storage_manager_->compute_tp(),
1059 iter_begin,
1060 iter_end,
1061 GlobalCmp(domain));
1062 }
1063 } else {
1064 assert(false);
1065 }
1066
1067 return Status::Ok();
1068 }
1069
sparse_read()1070 Status Reader::sparse_read() {
1071 // Compute result coordinates from the sparse fragments
1072 // `sparse_result_tiles` will hold all the relevant result tiles of
1073 // sparse fragments
1074 std::vector<ResultCoords> result_coords;
1075 std::vector<ResultTile> sparse_result_tiles;
1076
1077 RETURN_NOT_OK(compute_result_coords(&sparse_result_tiles, &result_coords));
1078 std::vector<ResultTile*> result_tiles;
1079 for (auto& srt : sparse_result_tiles)
1080 result_tiles.push_back(&srt);
1081
1082 // Compute result cell slabs
1083 std::vector<ResultCellSlab> result_cell_slabs;
1084 RETURN_CANCEL_OR_ERROR(
1085 compute_result_cell_slabs(result_coords, &result_cell_slabs));
1086 result_coords.clear();
1087
1088 apply_query_condition(
1089 &result_cell_slabs, &result_tiles, read_state_.partitioner_.subarray());
1090 get_result_tile_stats(result_tiles);
1091 get_result_cell_stats(result_cell_slabs);
1092
1093 RETURN_NOT_OK(copy_coordinates(&result_tiles, &result_cell_slabs));
1094 RETURN_NOT_OK(copy_attribute_values(
1095 UINT64_MAX,
1096 &result_tiles,
1097 &result_cell_slabs,
1098 *read_state_.partitioner_.subarray()));
1099 read_state_.overflowed_ = copy_overflowed_;
1100
1101 return Status::Ok();
1102 }
1103
add_extra_offset()1104 Status Reader::add_extra_offset() {
1105 for (const auto& it : buffers_) {
1106 const auto& name = it.first;
1107 if (!array_schema_->var_size(name))
1108 continue;
1109
1110 // Do not apply offset for empty results because we will
1111 // write backwards and corrupt memory we don't own.
1112 if (*it.second.buffer_size_ == 0)
1113 continue;
1114
1115 // The buffer should always be 0 or divisible by the bytesize.
1116 assert(!(*it.second.buffer_size_ < offsets_bytesize()));
1117
1118 auto buffer = static_cast<unsigned char*>(it.second.buffer_);
1119 if (offsets_format_mode_ == "bytes") {
1120 memcpy(
1121 buffer + *it.second.buffer_size_ - offsets_bytesize(),
1122 it.second.buffer_var_size_,
1123 offsets_bytesize());
1124 } else if (offsets_format_mode_ == "elements") {
1125 auto elements = *it.second.buffer_var_size_ /
1126 datatype_size(array_schema_->type(name));
1127 memcpy(
1128 buffer + *it.second.buffer_size_ - offsets_bytesize(),
1129 &elements,
1130 offsets_bytesize());
1131 } else {
1132 return logger_->status(Status::ReaderError(
1133 "Cannot add extra offset to buffer; Unsupported offsets format"));
1134 }
1135 }
1136
1137 return Status::Ok();
1138 }
1139
sparse_tile_overwritten(unsigned frag_idx,uint64_t tile_idx) const1140 bool Reader::sparse_tile_overwritten(
1141 unsigned frag_idx, uint64_t tile_idx) const {
1142 const auto& mbr = fragment_metadata_[frag_idx]->mbr(tile_idx);
1143 assert(!mbr.empty());
1144 auto fragment_num = (unsigned)fragment_metadata_.size();
1145 auto domain = array_schema_->domain();
1146
1147 for (unsigned f = frag_idx + 1; f < fragment_num; ++f) {
1148 if (fragment_metadata_[f]->dense() &&
1149 domain->covered(mbr, fragment_metadata_[f]->non_empty_domain()))
1150 return true;
1151 }
1152
1153 return false;
1154 }
1155
erase_coord_tiles(std::vector<ResultTile> * result_tiles) const1156 void Reader::erase_coord_tiles(std::vector<ResultTile>* result_tiles) const {
1157 for (auto& tile : *result_tiles) {
1158 auto dim_num = array_schema_->dim_num();
1159 for (unsigned d = 0; d < dim_num; ++d)
1160 tile.erase_tile(array_schema_->dimension(d)->name());
1161 tile.erase_tile(constants::coords);
1162 }
1163 }
1164
get_result_cell_stats(const std::vector<ResultCellSlab> & result_cell_slabs) const1165 void Reader::get_result_cell_stats(
1166 const std::vector<ResultCellSlab>& result_cell_slabs) const {
1167 uint64_t result_num = 0;
1168 for (const auto& rc : result_cell_slabs)
1169 result_num += rc.length_;
1170 stats_->add_counter("result_num", result_num);
1171 }
1172
get_result_tile_stats(const std::vector<ResultTile * > & result_tiles) const1173 void Reader::get_result_tile_stats(
1174 const std::vector<ResultTile*>& result_tiles) const {
1175 stats_->add_counter("overlap_tile_num", result_tiles.size());
1176
1177 uint64_t cell_num = 0;
1178 for (const auto& rt : result_tiles) {
1179 if (!fragment_metadata_[rt->frag_idx()]->dense())
1180 cell_num += rt->cell_num();
1181 else
1182 cell_num += array_schema_->domain()->cell_num_per_tile();
1183 }
1184 stats_->add_counter("cell_num", cell_num);
1185 }
1186
calculate_hilbert_values(std::vector<ResultCoords>::iterator iter_begin,std::vector<std::pair<uint64_t,uint64_t>> * hilbert_values) const1187 Status Reader::calculate_hilbert_values(
1188 std::vector<ResultCoords>::iterator iter_begin,
1189 std::vector<std::pair<uint64_t, uint64_t>>* hilbert_values) const {
1190 auto timer_se = stats_->start_timer("calculate_hilbert_values");
1191 auto dim_num = array_schema_->dim_num();
1192 Hilbert h(dim_num);
1193 auto bits = h.bits();
1194 auto max_bucket_val = ((uint64_t)1 << bits) - 1;
1195 auto coords_num = (uint64_t)hilbert_values->size();
1196
1197 // Calculate Hilbert values in parallel
1198 auto status = parallel_for(
1199 storage_manager_->compute_tp(), 0, coords_num, [&](uint64_t c) {
1200 std::vector<uint64_t> coords(dim_num);
1201 for (uint32_t d = 0; d < dim_num; ++d) {
1202 auto dim = array_schema_->dimension(d);
1203 coords[d] =
1204 dim->map_to_uint64(*(iter_begin + c), d, bits, max_bucket_val);
1205 }
1206 (*hilbert_values)[c] =
1207 std::pair<uint64_t, uint64_t>(h.coords_to_hilbert(&coords[0]), c);
1208 return Status::Ok();
1209 });
1210
1211 RETURN_NOT_OK_ELSE(status, logger_->status(status));
1212
1213 return Status::Ok();
1214 }
1215
reorganize_result_coords(std::vector<ResultCoords>::iterator iter_begin,std::vector<std::pair<uint64_t,uint64_t>> * hilbert_values) const1216 Status Reader::reorganize_result_coords(
1217 std::vector<ResultCoords>::iterator iter_begin,
1218 std::vector<std::pair<uint64_t, uint64_t>>* hilbert_values) const {
1219 auto timer_se = stats_->start_timer("reorganize_result_coords");
1220 auto coords_num = hilbert_values->size();
1221 size_t i_src, i_dst;
1222 ResultCoords pending;
1223 for (size_t i_dst_first = 0; i_dst_first < coords_num; ++i_dst_first) {
1224 // Check if this element needs to be permuted
1225 i_src = (*hilbert_values)[i_dst_first].second;
1226 if (i_src == i_dst_first)
1227 continue;
1228
1229 i_dst = i_dst_first;
1230 pending = std::move(*(iter_begin + i_dst));
1231
1232 // Follow the permutation cycle
1233 do {
1234 *(iter_begin + i_dst) = std::move(*(iter_begin + i_src));
1235 (*hilbert_values)[i_dst].second = i_dst;
1236
1237 i_dst = i_src;
1238 i_src = (*hilbert_values)[i_src].second;
1239 } while (i_src != i_dst_first);
1240
1241 *(iter_begin + i_dst) = std::move(pending);
1242 (*hilbert_values)[i_dst].second = i_dst;
1243 }
1244
1245 return Status::Ok();
1246 }
1247
belong_to_single_fragment(std::vector<ResultCoords>::iterator iter_begin,std::vector<ResultCoords>::iterator iter_end) const1248 bool Reader::belong_to_single_fragment(
1249 std::vector<ResultCoords>::iterator iter_begin,
1250 std::vector<ResultCoords>::iterator iter_end) const {
1251 if (iter_begin == iter_end)
1252 return true;
1253
1254 uint32_t last_frag_idx = iter_begin->tile_->frag_idx();
1255 for (auto it = iter_begin + 1; it != iter_end; ++it) {
1256 if (it->tile_->frag_idx() != last_frag_idx)
1257 return false;
1258 }
1259
1260 return true;
1261 }
1262
1263 } // namespace sm
1264 } // namespace tiledb
1265