1 /**
2  * @file   reader.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file implements class Reader.
31  */
32 
33 #include "tiledb/sm/query/reader.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array/array.h"
36 #include "tiledb/sm/array_schema/array_schema.h"
37 #include "tiledb/sm/array_schema/dimension.h"
38 #include "tiledb/sm/fragment/fragment_metadata.h"
39 #include "tiledb/sm/misc/comparators.h"
40 #include "tiledb/sm/misc/hilbert.h"
41 #include "tiledb/sm/misc/parallel_functions.h"
42 #include "tiledb/sm/misc/utils.h"
43 #include "tiledb/sm/query/query_macros.h"
44 #include "tiledb/sm/query/read_cell_slab_iter.h"
45 #include "tiledb/sm/query/result_tile.h"
46 #include "tiledb/sm/stats/global_stats.h"
47 #include "tiledb/sm/storage_manager/storage_manager.h"
48 #include "tiledb/sm/subarray/cell_slab.h"
49 #include "tiledb/sm/tile/generic_tile_io.h"
50 
51 using namespace tiledb;
52 using namespace tiledb::common;
53 using namespace tiledb::sm::stats;
54 
55 namespace tiledb {
56 namespace sm {
57 
58 namespace {
59 /**
60  * If the given iterator points to an "invalid" element, advance it until the
61  * pointed-to element is valid, or `end`. Validity is determined by calling
62  * `it->valid()`.
63  *
64  * Example:
65  *
66  * @code{.cpp}
67  * std::vector<T> vec = ...;
68  * // Get an iterator to the first valid vec element, or vec.end() if the
69  * // vector is empty or only contains invalid elements.
70  * auto it = skip_invalid_elements(vec.begin(), vec.end());
71  * // If there was a valid element, now advance the iterator to the next
72  * // valid element (or vec.end() if there are no more).
73  * it = skip_invalid_elements(++it, vec.end());
74  * @endcode
75  *
76  *
77  * @tparam IterT The iterator type
78  * @param it The iterator
79  * @param end The end iterator value
80  * @return Iterator pointing to a valid element, or `end`.
81  */
82 template <typename IterT>
skip_invalid_elements(IterT it,const IterT & end)83 inline IterT skip_invalid_elements(IterT it, const IterT& end) {
84   while (it != end && !it->valid()) {
85     ++it;
86   }
87   return it;
88 }
89 }  // namespace
90 
91 /* ****************************** */
92 /*          CONSTRUCTORS          */
93 /* ****************************** */
94 
Reader(stats::Stats * stats,tdb_shared_ptr<Logger> logger,StorageManager * storage_manager,Array * array,Config & config,std::unordered_map<std::string,QueryBuffer> & buffers,Subarray & subarray,Layout layout,QueryCondition & condition)95 Reader::Reader(
96     stats::Stats* stats,
97     tdb_shared_ptr<Logger> logger,
98     StorageManager* storage_manager,
99     Array* array,
100     Config& config,
101     std::unordered_map<std::string, QueryBuffer>& buffers,
102     Subarray& subarray,
103     Layout layout,
104     QueryCondition& condition)
105     : ReaderBase(
106           stats,
107           logger->clone("Reader", ++logger_id_),
108           storage_manager,
109           array,
110           config,
111           buffers,
112           subarray,
113           layout,
114           condition) {
115 }
116 
117 /* ****************************** */
118 /*               API              */
119 /* ****************************** */
120 
finalize()121 Status Reader::finalize() {
122   return Status::Ok();
123 }
124 
incomplete() const125 bool Reader::incomplete() const {
126   return read_state_.overflowed_ || !read_state_.done();
127 }
128 
init()129 Status Reader::init() {
130   // Sanity checks
131   if (storage_manager_ == nullptr)
132     return logger_->status(Status::ReaderError(
133         "Cannot initialize reader; Storage manager not set"));
134   if (array_schema_ == nullptr)
135     return logger_->status(Status::ReaderError(
136         "Cannot initialize reader; Array metadata not set"));
137   if (buffers_.empty())
138     return logger_->status(
139         Status::ReaderError("Cannot initialize reader; Buffers not set"));
140   if (array_schema_->dense() && !subarray_.is_set())
141     return logger_->status(Status::ReaderError(
142         "Cannot initialize reader; Dense reads must have a subarray set"));
143 
144   // Check subarray
145   RETURN_NOT_OK(check_subarray());
146 
147   // Initialize the read state
148   RETURN_NOT_OK(init_read_state());
149 
150   // Check the validity buffer sizes. This must be performed
151   // after `init_read_state` to ensure we have set the
152   // member state correctly from the config.
153   RETURN_NOT_OK(check_validity_buffer_sizes());
154 
155   return Status::Ok();
156 }
157 
initialize_memory_budget()158 Status Reader::initialize_memory_budget() {
159   return Status::Ok();
160 }
161 
read_state() const162 const Reader::ReadState* Reader::read_state() const {
163   return &read_state_;
164 }
165 
read_state()166 Reader::ReadState* Reader::read_state() {
167   return &read_state_;
168 }
169 
complete_read_loop()170 Status Reader::complete_read_loop() {
171   if (offsets_extra_element_) {
172     RETURN_NOT_OK(add_extra_offset());
173   }
174 
175   return Status::Ok();
176 }
177 
dowork()178 Status Reader::dowork() {
179   // Check that the query condition is valid.
180   RETURN_NOT_OK(condition_.check(array_schema_));
181 
182   get_dim_attr_stats();
183 
184   auto timer_se = stats_->start_timer("read");
185 
186   auto dense_mode = array_schema_->dense();
187 
188   // Get next partition
189   if (!read_state_.unsplittable_)
190     RETURN_NOT_OK(read_state_.next());
191 
192   // Handle empty array or empty/finished subarray
193   if (!dense_mode && fragment_metadata_.empty()) {
194     zero_out_buffer_sizes();
195     return Status::Ok();
196   }
197 
198   // Loop until you find results, or unsplittable, or done
199   do {
200     stats_->add_counter("loop_num", 1);
201 
202     read_state_.overflowed_ = false;
203     copy_overflowed_ = false;
204     reset_buffer_sizes();
205 
206     // Perform read
207     if (dense_mode) {
208       RETURN_NOT_OK(dense_read());
209     } else {
210       RETURN_NOT_OK(sparse_read());
211     }
212 
213     // In the case of overflow, we need to split the current partition
214     // without advancing to the next partition
215     if (read_state_.overflowed_) {
216       zero_out_buffer_sizes();
217       RETURN_NOT_OK(read_state_.split_current());
218 
219       if (read_state_.unsplittable_) {
220         return complete_read_loop();
221       }
222     } else {
223       bool has_results = false;
224       for (const auto& it : buffers_) {
225         if (*(it.second.buffer_size_) != 0)
226           has_results = true;
227       }
228 
229       // Need to reset unsplittable if the results fit after all
230       if (has_results)
231         read_state_.unsplittable_ = false;
232 
233       if (has_results || read_state_.done()) {
234         return complete_read_loop();
235       }
236 
237       RETURN_NOT_OK(read_state_.next());
238     }
239   } while (true);
240 
241   return Status::Ok();
242 }
243 
reset()244 void Reader::reset() {
245 }
246 
247 /* ****************************** */
248 /*         PRIVATE METHODS        */
249 /* ****************************** */
250 
compute_result_cell_slabs(const std::vector<ResultCoords> & result_coords,std::vector<ResultCellSlab> * result_cell_slabs) const251 Status Reader::compute_result_cell_slabs(
252     const std::vector<ResultCoords>& result_coords,
253     std::vector<ResultCellSlab>* result_cell_slabs) const {
254   auto timer_se =
255       stats_->start_timer("compute_sparse_result_cell_slabs_sparse");
256 
257   // Trivial case
258   auto coords_num = (uint64_t)result_coords.size();
259   if (coords_num == 0)
260     return Status::Ok();
261 
262   // Initialize the first range
263   auto coords_end = result_coords.end();
264   auto it = skip_invalid_elements(result_coords.begin(), coords_end);
265   if (it == coords_end) {
266     return logger_->status(Status::ReaderError("Unexpected empty cell range."));
267   }
268   uint64_t start_pos = it->pos_;
269   uint64_t end_pos = start_pos;
270   ResultTile* tile = it->tile_;
271 
272   // Scan the coordinates and compute ranges
273   it = skip_invalid_elements(++it, coords_end);
274   while (it != coords_end) {
275     if (it->tile_ == tile && it->pos_ == end_pos + 1) {
276       // Same range - advance end position
277       end_pos = it->pos_;
278     } else {
279       // New range - append previous range
280       result_cell_slabs->emplace_back(tile, start_pos, end_pos - start_pos + 1);
281       start_pos = it->pos_;
282       end_pos = start_pos;
283       tile = it->tile_;
284     }
285     it = skip_invalid_elements(++it, coords_end);
286   }
287 
288   // Append the last range
289   result_cell_slabs->emplace_back(tile, start_pos, end_pos - start_pos + 1);
290 
291   return Status::Ok();
292 }
293 
compute_range_result_coords(Subarray * subarray,unsigned frag_idx,ResultTile * tile,uint64_t range_idx,std::vector<ResultCoords> * result_coords)294 Status Reader::compute_range_result_coords(
295     Subarray* subarray,
296     unsigned frag_idx,
297     ResultTile* tile,
298     uint64_t range_idx,
299     std::vector<ResultCoords>* result_coords) {
300   auto coords_num = tile->cell_num();
301   auto dim_num = array_schema_->dim_num();
302   auto cell_order = array_schema_->cell_order();
303   auto range_coords = subarray->get_range_coords(range_idx);
304 
305   if (array_schema_->dense()) {
306     std::vector<uint8_t> result_bitmap(coords_num, 1);
307     std::vector<uint8_t> overwritten_bitmap(coords_num, 0);
308 
309     // Compute result and overwritten bitmap per dimension
310     for (unsigned d = 0; d < dim_num; ++d) {
311       const auto& ranges = subarray->ranges_for_dim(d);
312       RETURN_NOT_OK(tile->compute_results_dense(
313           d,
314           ranges[range_coords[d]],
315           fragment_metadata_,
316           frag_idx,
317           &result_bitmap,
318           &overwritten_bitmap));
319     }
320 
321     // Gather results
322     for (uint64_t pos = 0; pos < coords_num; ++pos) {
323       if (result_bitmap[pos] && !overwritten_bitmap[pos])
324         result_coords->emplace_back(tile, pos);
325     }
326   } else {  // Sparse
327     std::vector<uint8_t> result_bitmap(coords_num, 1);
328 
329     // Compute result and overwritten bitmap per dimension
330     for (unsigned d = 0; d < dim_num; ++d) {
331       // For col-major cell ordering, iterate the dimensions
332       // in reverse.
333       const unsigned dim_idx =
334           cell_order == Layout::COL_MAJOR ? dim_num - d - 1 : d;
335       if (!subarray->is_default(dim_idx)) {
336         const auto& ranges = subarray->ranges_for_dim(dim_idx);
337         RETURN_NOT_OK(tile->compute_results_sparse(
338             dim_idx,
339             ranges[range_coords[dim_idx]],
340             &result_bitmap,
341             cell_order));
342       }
343     }
344 
345     // Gather results
346     for (uint64_t pos = 0; pos < coords_num; ++pos) {
347       if (result_bitmap[pos])
348         result_coords->emplace_back(tile, pos);
349     }
350   }
351 
352   return Status::Ok();
353 }
354 
compute_range_result_coords(Subarray * subarray,const std::vector<bool> & single_fragment,const std::map<std::pair<unsigned,uint64_t>,size_t> & result_tile_map,std::vector<ResultTile> * result_tiles,std::vector<std::vector<ResultCoords>> * range_result_coords)355 Status Reader::compute_range_result_coords(
356     Subarray* subarray,
357     const std::vector<bool>& single_fragment,
358     const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
359     std::vector<ResultTile>* result_tiles,
360     std::vector<std::vector<ResultCoords>>* range_result_coords) {
361   auto timer_se = stats_->start_timer("compute_range_result_coords");
362 
363   auto range_num = subarray->range_num();
364   range_result_coords->resize(range_num);
365   auto cell_order = array_schema_->cell_order();
366   auto allows_dups = array_schema_->allows_dups();
367 
368   // To de-dupe the ranges, we may need to sort them. If the
369   // read layout is UNORDERED, we will sort by the cell layout.
370   // If the cell layout is hilbert, we will sort in row-major to
371   // avoid the expense of calculating hilbert values.
372   Layout sort_layout = layout_;
373   if (sort_layout == Layout::UNORDERED) {
374     sort_layout = cell_order;
375     if (sort_layout == Layout::HILBERT) {
376       sort_layout = Layout::ROW_MAJOR;
377     }
378   }
379 
380   auto status = parallel_for(
381       storage_manager_->compute_tp(), 0, range_num, [&](uint64_t r) {
382         // Compute overlapping coordinates per range
383         RETURN_NOT_OK(compute_range_result_coords(
384             subarray,
385             r,
386             result_tile_map,
387             result_tiles,
388             &((*range_result_coords)[r])));
389 
390         // Dedup unless there is a single fragment or array schema allows
391         // duplicates
392         if (!single_fragment[r] && !allows_dups) {
393           RETURN_CANCEL_OR_ERROR(sort_result_coords(
394               ((*range_result_coords)[r]).begin(),
395               ((*range_result_coords)[r]).end(),
396               ((*range_result_coords)[r]).size(),
397               sort_layout));
398           RETURN_CANCEL_OR_ERROR(
399               dedup_result_coords(&((*range_result_coords)[r])));
400         }
401 
402         return Status::Ok();
403       });
404 
405   RETURN_NOT_OK(status);
406 
407   return Status::Ok();
408 }
409 
compute_range_result_coords(Subarray * subarray,uint64_t range_idx,uint32_t fragment_idx,const std::map<std::pair<unsigned,uint64_t>,size_t> & result_tile_map,std::vector<ResultTile> * result_tiles,std::vector<ResultCoords> * range_result_coords)410 Status Reader::compute_range_result_coords(
411     Subarray* subarray,
412     uint64_t range_idx,
413     uint32_t fragment_idx,
414     const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
415     std::vector<ResultTile>* result_tiles,
416     std::vector<ResultCoords>* range_result_coords) {
417   // Skip dense fragments
418   if (fragment_metadata_[fragment_idx]->dense())
419     return Status::Ok();
420 
421   auto tr =
422       subarray->tile_overlap(fragment_idx, range_idx)->tile_ranges_.begin();
423   auto tr_end =
424       subarray->tile_overlap(fragment_idx, range_idx)->tile_ranges_.end();
425   auto t = subarray->tile_overlap(fragment_idx, range_idx)->tiles_.begin();
426   auto t_end = subarray->tile_overlap(fragment_idx, range_idx)->tiles_.end();
427 
428   while (tr != tr_end || t != t_end) {
429     // Handle tile range
430     if (tr != tr_end && (t == t_end || tr->first < t->first)) {
431       for (uint64_t i = tr->first; i <= tr->second; ++i) {
432         auto pair = std::pair<unsigned, uint64_t>(fragment_idx, i);
433         auto tile_it = result_tile_map.find(pair);
434         assert(tile_it != result_tile_map.end());
435         auto tile_idx = tile_it->second;
436         auto& tile = (*result_tiles)[tile_idx];
437 
438         // Add results only if the sparse tile MBR is not fully
439         // covered by a more recent fragment's non-empty domain
440         if (!sparse_tile_overwritten(fragment_idx, i))
441           RETURN_NOT_OK(get_all_result_coords(&tile, range_result_coords));
442       }
443       ++tr;
444     } else {
445       // Handle single tile
446       auto pair = std::pair<unsigned, uint64_t>(fragment_idx, t->first);
447       auto tile_it = result_tile_map.find(pair);
448       assert(tile_it != result_tile_map.end());
449       auto tile_idx = tile_it->second;
450       auto& tile = (*result_tiles)[tile_idx];
451       if (t->second == 1.0) {  // Full overlap
452         // Add results only if the sparse tile MBR is not fully
453         // covered by a more recent fragment's non-empty domain
454         if (!sparse_tile_overwritten(fragment_idx, t->first))
455           RETURN_NOT_OK(get_all_result_coords(&tile, range_result_coords));
456       } else {  // Partial overlap
457         RETURN_NOT_OK(compute_range_result_coords(
458             subarray, fragment_idx, &tile, range_idx, range_result_coords));
459       }
460       ++t;
461     }
462   }
463 
464   return Status::Ok();
465 }
466 
compute_range_result_coords(Subarray * subarray,uint64_t range_idx,const std::map<std::pair<unsigned,uint64_t>,size_t> & result_tile_map,std::vector<ResultTile> * result_tiles,std::vector<ResultCoords> * range_result_coords)467 Status Reader::compute_range_result_coords(
468     Subarray* subarray,
469     uint64_t range_idx,
470     const std::map<std::pair<unsigned, uint64_t>, size_t>& result_tile_map,
471     std::vector<ResultTile>* result_tiles,
472     std::vector<ResultCoords>* range_result_coords) {
473   // Gather result range coordinates per fragment
474   auto fragment_num = fragment_metadata_.size();
475   std::vector<std::vector<ResultCoords>> range_result_coords_vec(fragment_num);
476   auto status = parallel_for(
477       storage_manager_->compute_tp(), 0, fragment_num, [&](uint32_t f) {
478         return compute_range_result_coords(
479             subarray,
480             range_idx,
481             f,
482             result_tile_map,
483             result_tiles,
484             &range_result_coords_vec[f]);
485       });
486   RETURN_NOT_OK(status);
487 
488   // Consolidate the result coordinates in the single result vector
489   for (const auto& vec : range_result_coords_vec) {
490     for (const auto& r : vec)
491       range_result_coords->emplace_back(r);
492   }
493 
494   return Status::Ok();
495 }
496 
compute_subarray_coords(std::vector<std::vector<ResultCoords>> * range_result_coords,std::vector<ResultCoords> * result_coords)497 Status Reader::compute_subarray_coords(
498     std::vector<std::vector<ResultCoords>>* range_result_coords,
499     std::vector<ResultCoords>* result_coords) {
500   auto timer_se = stats_->start_timer("compute_subarray_coords");
501   // The input 'result_coords' is already sorted. Save the current size
502   // before inserting new elements.
503   const size_t result_coords_size = result_coords->size();
504 
505   // Add all valid ``range_result_coords`` to ``result_coords``
506   for (const auto& rv : *range_result_coords) {
507     for (const auto& c : rv) {
508       if (c.valid())
509         result_coords->emplace_back(c.tile_, c.pos_);
510     }
511   }
512 
513   // No need to sort in UNORDERED layout
514   if (layout_ == Layout::UNORDERED)
515     return Status::Ok();
516 
517   // We should not sort if:
518   // - there is a single fragment and global order
519   // - there is a single fragment and one dimension
520   // - there are multiple fragments and a single range and dups are not allowed
521   //   (therefore, the coords in that range have already been sorted)
522   auto dim_num = array_schema_->dim_num();
523   bool must_sort = true;
524   auto allows_dups = array_schema_->allows_dups();
525   auto single_range = (range_result_coords->size() == 1);
526   if (layout_ == Layout::GLOBAL_ORDER || dim_num == 1) {
527     must_sort = !belong_to_single_fragment(
528         result_coords->begin() + result_coords_size, result_coords->end());
529   } else if (single_range && !allows_dups) {
530     must_sort = belong_to_single_fragment(
531         result_coords->begin() + result_coords_size, result_coords->end());
532   }
533 
534   if (must_sort) {
535     RETURN_NOT_OK(sort_result_coords(
536         result_coords->begin() + result_coords_size,
537         result_coords->end(),
538         result_coords->size() - result_coords_size,
539         layout_));
540   }
541 
542   return Status::Ok();
543 }
544 
compute_sparse_result_tiles(std::vector<ResultTile> * result_tiles,std::map<std::pair<unsigned,uint64_t>,size_t> * result_tile_map,std::vector<bool> * single_fragment)545 Status Reader::compute_sparse_result_tiles(
546     std::vector<ResultTile>* result_tiles,
547     std::map<std::pair<unsigned, uint64_t>, size_t>* result_tile_map,
548     std::vector<bool>* single_fragment) {
549   auto timer_se = stats_->start_timer("compute_sparse_result_tiles");
550 
551   // For easy reference
552   auto domain = array_schema_->domain();
553   auto& partitioner = read_state_.partitioner_;
554   const auto& subarray = partitioner.current();
555   auto range_num = subarray.range_num();
556   auto fragment_num = fragment_metadata_.size();
557   std::vector<unsigned> first_fragment;
558   first_fragment.resize(range_num);
559   for (uint64_t r = 0; r < range_num; ++r)
560     first_fragment[r] = UINT32_MAX;
561 
562   single_fragment->resize(range_num);
563   for (uint64_t i = 0; i < range_num; ++i)
564     (*single_fragment)[i] = true;
565 
566   result_tiles->clear();
567   for (unsigned f = 0; f < fragment_num; ++f) {
568     // Skip dense fragments
569     if (fragment_metadata_[f]->dense())
570       continue;
571 
572     for (uint64_t r = 0; r < range_num; ++r) {
573       // Handle range of tiles (full overlap)
574       const auto& tile_ranges = subarray.tile_overlap(f, r)->tile_ranges_;
575       for (const auto& tr : tile_ranges) {
576         for (uint64_t t = tr.first; t <= tr.second; ++t) {
577           auto pair = std::pair<unsigned, uint64_t>(f, t);
578           // Add tile only if it does not already exist
579           if (result_tile_map->find(pair) == result_tile_map->end()) {
580             result_tiles->emplace_back(f, t, domain);
581             (*result_tile_map)[pair] = result_tiles->size() - 1;
582           }
583           // Always check range for multiple fragments
584           if (f > first_fragment[r])
585             (*single_fragment)[r] = false;
586           else
587             first_fragment[r] = f;
588         }
589       }
590 
591       // Handle single tiles
592       const auto& o_tiles = subarray.tile_overlap(f, r)->tiles_;
593       for (const auto& o_tile : o_tiles) {
594         auto t = o_tile.first;
595         auto pair = std::pair<unsigned, uint64_t>(f, t);
596         // Add tile only if it does not already exist
597         if (result_tile_map->find(pair) == result_tile_map->end()) {
598           result_tiles->emplace_back(f, t, domain);
599           (*result_tile_map)[pair] = result_tiles->size() - 1;
600         }
601         // Always check range for multiple fragments
602         if (f > first_fragment[r])
603           (*single_fragment)[r] = false;
604         else
605           first_fragment[r] = f;
606       }
607     }
608   }
609 
610   return Status::Ok();
611 }
612 
613 template <class T>
compute_result_cell_slabs(const Subarray & subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles,std::vector<ResultCoords> * result_coords,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs) const614 Status Reader::compute_result_cell_slabs(
615     const Subarray& subarray,
616     std::map<const T*, ResultSpaceTile<T>>* result_space_tiles,
617     std::vector<ResultCoords>* result_coords,
618     std::vector<ResultTile*>* result_tiles,
619     std::vector<ResultCellSlab>* result_cell_slabs) const {
620   auto timer_se = stats_->start_timer("compute_sparse_result_cell_slabs_dense");
621 
622   auto layout = subarray.layout();
623   if (layout == Layout::ROW_MAJOR || layout == Layout::COL_MAJOR) {
624     uint64_t result_coords_pos = 0;
625     std::set<std::pair<unsigned, uint64_t>> frag_tile_set;
626     return compute_result_cell_slabs_row_col<T>(
627         subarray,
628         result_space_tiles,
629         result_coords,
630         &result_coords_pos,
631         result_tiles,
632         &frag_tile_set,
633         result_cell_slabs);
634   } else if (layout == Layout::GLOBAL_ORDER) {
635     return compute_result_cell_slabs_global<T>(
636         subarray,
637         result_space_tiles,
638         result_coords,
639         result_tiles,
640         result_cell_slabs);
641   } else {  // UNORDERED
642     assert(false);
643   }
644 
645   return Status::Ok();
646 }
647 
648 template <class T>
compute_result_cell_slabs_row_col(const Subarray & subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles,std::vector<ResultCoords> * result_coords,uint64_t * result_coords_pos,std::vector<ResultTile * > * result_tiles,std::set<std::pair<unsigned,uint64_t>> * frag_tile_set,std::vector<ResultCellSlab> * result_cell_slabs) const649 Status Reader::compute_result_cell_slabs_row_col(
650     const Subarray& subarray,
651     std::map<const T*, ResultSpaceTile<T>>* result_space_tiles,
652     std::vector<ResultCoords>* result_coords,
653     uint64_t* result_coords_pos,
654     std::vector<ResultTile*>* result_tiles,
655     std::set<std::pair<unsigned, uint64_t>>* frag_tile_set,
656     std::vector<ResultCellSlab>* result_cell_slabs) const {
657   // Compute result space tiles. The result space tiles hold all the
658   // relevant result tiles of the dense fragments
659   compute_result_space_tiles<T>(
660       &subarray, read_state_.partitioner_.subarray(), result_space_tiles);
661 
662   // Gather result cell slabs and pointers to result tiles
663   // `result_tiles` holds pointers to tiles that store actual results,
664   // which can be stored either in `sparse_result_tiles` (sparse)
665   // or in `result_space_tiles` (dense).
666   auto rcs_it = ReadCellSlabIter<T>(
667       &subarray, result_space_tiles, result_coords, *result_coords_pos);
668   for (rcs_it.begin(); !rcs_it.end(); ++rcs_it) {
669     // Add result cell slab
670     auto result_cell_slab = rcs_it.result_cell_slab();
671     result_cell_slabs->push_back(result_cell_slab);
672     // Add result tile
673     if (result_cell_slab.tile_ != nullptr) {
674       auto frag_idx = result_cell_slab.tile_->frag_idx();
675       auto tile_idx = result_cell_slab.tile_->tile_idx();
676       auto frag_tile_tuple = std::pair<unsigned, uint64_t>(frag_idx, tile_idx);
677       auto it = frag_tile_set->find(frag_tile_tuple);
678       if (it == frag_tile_set->end()) {
679         frag_tile_set->insert(frag_tile_tuple);
680         result_tiles->push_back(result_cell_slab.tile_);
681       }
682     }
683   }
684   *result_coords_pos = rcs_it.result_coords_pos();
685 
686   return Status::Ok();
687 }
688 
689 template <class T>
compute_result_cell_slabs_global(const Subarray & subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles,std::vector<ResultCoords> * result_coords,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs) const690 Status Reader::compute_result_cell_slabs_global(
691     const Subarray& subarray,
692     std::map<const T*, ResultSpaceTile<T>>* result_space_tiles,
693     std::vector<ResultCoords>* result_coords,
694     std::vector<ResultTile*>* result_tiles,
695     std::vector<ResultCellSlab>* result_cell_slabs) const {
696   const auto& tile_coords = subarray.tile_coords();
697   auto cell_order = array_schema_->cell_order();
698   std::vector<Subarray> tile_subarrays;
699   tile_subarrays.reserve(tile_coords.size());
700   uint64_t result_coords_pos = 0;
701   std::set<std::pair<unsigned, uint64_t>> frag_tile_set;
702 
703   for (const auto& tc : tile_coords) {
704     tile_subarrays.emplace_back(
705         subarray.crop_to_tile((const T*)&tc[0], cell_order));
706     auto& tile_subarray = tile_subarrays.back();
707     tile_subarray.template compute_tile_coords<T>();
708 
709     RETURN_NOT_OK(compute_result_cell_slabs_row_col<T>(
710         tile_subarray,
711         result_space_tiles,
712         result_coords,
713         &result_coords_pos,
714         result_tiles,
715         &frag_tile_set,
716         result_cell_slabs));
717   }
718 
719   return Status::Ok();
720 }
721 
compute_result_coords(std::vector<ResultTile> * result_tiles,std::vector<ResultCoords> * result_coords)722 Status Reader::compute_result_coords(
723     std::vector<ResultTile>* result_tiles,
724     std::vector<ResultCoords>* result_coords) {
725   auto timer_se = stats_->start_timer("compute_result_coords");
726 
727   // Get overlapping tile indexes
728   typedef std::pair<unsigned, uint64_t> FragTileTuple;
729   std::map<FragTileTuple, size_t> result_tile_map;
730   std::vector<bool> single_fragment;
731 
732   RETURN_CANCEL_OR_ERROR(compute_sparse_result_tiles(
733       result_tiles, &result_tile_map, &single_fragment));
734 
735   if (result_tiles->empty())
736     return Status::Ok();
737 
738   // Create temporary vector with pointers to result tiles, so that
739   // `read_tiles`, `unfilter_tiles` below can work without changes
740   std::vector<ResultTile*> tmp_result_tiles;
741   for (auto& result_tile : *result_tiles)
742     tmp_result_tiles.push_back(&result_tile);
743 
744   // Preload zipped coordinate tile offsets. Note that this will
745   // ignore fragments with a version >= 5.
746   auto& subarray = read_state_.partitioner_.current();
747   std::vector<std::string> zipped_coords_names = {constants::coords};
748   RETURN_CANCEL_OR_ERROR(load_tile_offsets(
749       read_state_.partitioner_.subarray(), &zipped_coords_names));
750 
751   // Preload unzipped coordinate tile offsets. Note that this will
752   // ignore fragments with a version < 5.
753   const auto dim_num = array_schema_->dim_num();
754   std::vector<std::string> dim_names;
755   dim_names.reserve(dim_num);
756   for (unsigned d = 0; d < dim_num; ++d)
757     dim_names.emplace_back(array_schema_->dimension(d)->name());
758   RETURN_CANCEL_OR_ERROR(
759       load_tile_offsets(read_state_.partitioner_.subarray(), &dim_names));
760 
761   // Read and unfilter zipped coordinate tiles. Note that
762   // this will ignore fragments with a version >= 5.
763   RETURN_CANCEL_OR_ERROR(
764       read_coordinate_tiles(&zipped_coords_names, &tmp_result_tiles));
765   RETURN_CANCEL_OR_ERROR(unfilter_tiles(constants::coords, &tmp_result_tiles));
766 
767   // Read and unfilter unzipped coordinate tiles. Note that
768   // this will ignore fragments with a version < 5.
769   RETURN_CANCEL_OR_ERROR(read_coordinate_tiles(&dim_names, &tmp_result_tiles));
770   for (const auto& dim_name : dim_names) {
771     RETURN_CANCEL_OR_ERROR(unfilter_tiles(dim_name, &tmp_result_tiles));
772   }
773 
774   // Compute the read coordinates for all fragments for each subarray range.
775   std::vector<std::vector<ResultCoords>> range_result_coords;
776   RETURN_CANCEL_OR_ERROR(compute_range_result_coords(
777       &subarray,
778       single_fragment,
779       result_tile_map,
780       result_tiles,
781       &range_result_coords));
782   result_tile_map.clear();
783 
784   // Compute final coords (sorted in the result layout) of the whole subarray.
785   RETURN_CANCEL_OR_ERROR(
786       compute_subarray_coords(&range_result_coords, result_coords));
787   range_result_coords.clear();
788 
789   return Status::Ok();
790 }
791 
dedup_result_coords(std::vector<ResultCoords> * result_coords) const792 Status Reader::dedup_result_coords(
793     std::vector<ResultCoords>* result_coords) const {
794   auto coords_end = result_coords->end();
795   auto it = skip_invalid_elements(result_coords->begin(), coords_end);
796   while (it != coords_end) {
797     auto next_it = skip_invalid_elements(std::next(it), coords_end);
798     if (next_it != coords_end && it->same_coords(*next_it)) {
799       if (it->tile_->frag_idx() < next_it->tile_->frag_idx()) {
800         it->invalidate();
801         it = skip_invalid_elements(++it, coords_end);
802       } else {
803         next_it->invalidate();
804       }
805     } else {
806       it = skip_invalid_elements(++it, coords_end);
807     }
808   }
809   return Status::Ok();
810 }
811 
dense_read()812 Status Reader::dense_read() {
813   auto type = array_schema_->domain()->dimension(0)->type();
814   switch (type) {
815     case Datatype::INT8:
816       return dense_read<int8_t>();
817     case Datatype::UINT8:
818       return dense_read<uint8_t>();
819     case Datatype::INT16:
820       return dense_read<int16_t>();
821     case Datatype::UINT16:
822       return dense_read<uint16_t>();
823     case Datatype::INT32:
824       return dense_read<int>();
825     case Datatype::UINT32:
826       return dense_read<unsigned>();
827     case Datatype::INT64:
828       return dense_read<int64_t>();
829     case Datatype::UINT64:
830       return dense_read<uint64_t>();
831     case Datatype::DATETIME_YEAR:
832     case Datatype::DATETIME_MONTH:
833     case Datatype::DATETIME_WEEK:
834     case Datatype::DATETIME_DAY:
835     case Datatype::DATETIME_HR:
836     case Datatype::DATETIME_MIN:
837     case Datatype::DATETIME_SEC:
838     case Datatype::DATETIME_MS:
839     case Datatype::DATETIME_US:
840     case Datatype::DATETIME_NS:
841     case Datatype::DATETIME_PS:
842     case Datatype::DATETIME_FS:
843     case Datatype::DATETIME_AS:
844     case Datatype::TIME_HR:
845     case Datatype::TIME_MIN:
846     case Datatype::TIME_SEC:
847     case Datatype::TIME_MS:
848     case Datatype::TIME_US:
849     case Datatype::TIME_NS:
850     case Datatype::TIME_PS:
851     case Datatype::TIME_FS:
852     case Datatype::TIME_AS:
853       return dense_read<int64_t>();
854     default:
855       return logger_->status(Status::ReaderError(
856           "Cannot read dense array; Unsupported domain type"));
857   }
858 
859   return Status::Ok();
860 }
861 
862 template <class T>
dense_read()863 Status Reader::dense_read() {
864   // Sanity checks
865   assert(std::is_integral<T>::value);
866 
867   // Compute result coordinates from the sparse fragments
868   // `sparse_result_tiles` will hold all the relevant result tiles of
869   // sparse fragments
870   std::vector<ResultCoords> result_coords;
871   std::vector<ResultTile> sparse_result_tiles;
872   RETURN_NOT_OK(compute_result_coords(&sparse_result_tiles, &result_coords));
873 
874   // Compute result cell slabs.
875   // `result_space_tiles` will hold all the relevant result tiles of
876   // dense fragments. `result` tiles will hold pointers to the
877   // final result tiles for both sparse and dense fragments.
878   std::map<const T*, ResultSpaceTile<T>> result_space_tiles;
879   std::vector<ResultCellSlab> result_cell_slabs;
880   std::vector<ResultTile*> result_tiles;
881   auto& subarray = read_state_.partitioner_.current();
882 
883   RETURN_NOT_OK(subarray.compute_tile_coords<T>());
884   RETURN_NOT_OK(compute_result_cell_slabs<T>(
885       subarray,
886       &result_space_tiles,
887       &result_coords,
888       &result_tiles,
889       &result_cell_slabs));
890 
891   auto stride = array_schema_->domain()->stride<T>(subarray.layout());
892   RETURN_NOT_OK(apply_query_condition(
893       &result_cell_slabs,
894       &result_tiles,
895       read_state_.partitioner_.subarray(),
896       stride));
897 
898   get_result_tile_stats(result_tiles);
899   get_result_cell_stats(result_cell_slabs);
900 
901   // Clear sparse coordinate tiles (not needed any more)
902   erase_coord_tiles(&sparse_result_tiles);
903 
904   // Needed when copying the cells
905   RETURN_NOT_OK(copy_attribute_values(
906       stride,
907       &result_tiles,
908       &result_cell_slabs,
909       *read_state_.partitioner_.subarray()));
910   read_state_.overflowed_ = copy_overflowed_;
911 
912   // Fill coordinates if the user requested them
913   if (!read_state_.overflowed_ && has_coords())
914     RETURN_CANCEL_OR_ERROR(fill_dense_coords<T>(subarray));
915   read_state_.overflowed_ = copy_overflowed_;
916 
917   return Status::Ok();
918 }
919 
get_all_result_coords(ResultTile * tile,std::vector<ResultCoords> * result_coords) const920 Status Reader::get_all_result_coords(
921     ResultTile* tile, std::vector<ResultCoords>* result_coords) const {
922   auto coords_num = tile->cell_num();
923   for (uint64_t i = 0; i < coords_num; ++i)
924     result_coords->emplace_back(tile, i);
925 
926   return Status::Ok();
927 }
928 
has_separate_coords() const929 bool Reader::has_separate_coords() const {
930   for (const auto& it : buffers_) {
931     if (array_schema_->is_dim(it.first))
932       return true;
933   }
934 
935   return false;
936 }
937 
init_read_state()938 Status Reader::init_read_state() {
939   auto timer_se = stats_->start_timer("init_state");
940 
941   // Check subarray
942   if (subarray_.layout() == Layout::GLOBAL_ORDER && subarray_.range_num() != 1)
943     return logger_->status(
944         Status::ReaderError("Cannot initialize read "
945                             "state; Multi-range "
946                             "subarrays do not "
947                             "support global order"));
948 
949   // Get config
950   bool found = false;
951   uint64_t memory_budget = 0;
952   RETURN_NOT_OK(
953       config_.get<uint64_t>("sm.memory_budget", &memory_budget, &found));
954   assert(found);
955   uint64_t memory_budget_var = 0;
956   RETURN_NOT_OK(config_.get<uint64_t>(
957       "sm.memory_budget_var", &memory_budget_var, &found));
958   assert(found);
959   offsets_format_mode_ = config_.get("sm.var_offsets.mode", &found);
960   assert(found);
961   if (offsets_format_mode_ != "bytes" && offsets_format_mode_ != "elements") {
962     return logger_->status(
963         Status::ReaderError("Cannot initialize reader; Unsupported offsets "
964                             "format in configuration"));
965   }
966   RETURN_NOT_OK(config_.get<bool>(
967       "sm.var_offsets.extra_element", &offsets_extra_element_, &found));
968   assert(found);
969   RETURN_NOT_OK(config_.get<uint32_t>(
970       "sm.var_offsets.bitsize", &offsets_bitsize_, &found));
971   if (offsets_bitsize_ != 32 && offsets_bitsize_ != 64) {
972     return logger_->status(
973         Status::ReaderError("Cannot initialize reader; Unsupported offsets "
974                             "bitsize in configuration"));
975   }
976   assert(found);
977 
978   // Consider the validity memory budget to be identical to `sm.memory_budget`
979   // because the validity vector is currently a bytemap. When converted to a
980   // bitmap, this can be budgeted as `sm.memory_budget` / 8
981   uint64_t memory_budget_validity = memory_budget;
982 
983   // Create read state
984   read_state_.partitioner_ = SubarrayPartitioner(
985       &config_,
986       subarray_,
987       memory_budget,
988       memory_budget_var,
989       memory_budget_validity,
990       storage_manager_->compute_tp(),
991       stats_,
992       logger_);
993   read_state_.overflowed_ = false;
994   read_state_.unsplittable_ = false;
995 
996   // Set result size budget
997   for (const auto& a : buffers_) {
998     auto attr_name = a.first;
999     auto buffer_size = a.second.buffer_size_;
1000     auto buffer_var_size = a.second.buffer_var_size_;
1001     auto buffer_validity_size = a.second.validity_vector_.buffer_size();
1002     if (!array_schema_->var_size(attr_name)) {
1003       if (!array_schema_->is_nullable(attr_name)) {
1004         RETURN_NOT_OK(read_state_.partitioner_.set_result_budget(
1005             attr_name.c_str(), *buffer_size));
1006       } else {
1007         RETURN_NOT_OK(read_state_.partitioner_.set_result_budget_nullable(
1008             attr_name.c_str(), *buffer_size, *buffer_validity_size));
1009       }
1010     } else {
1011       if (!array_schema_->is_nullable(attr_name)) {
1012         RETURN_NOT_OK(read_state_.partitioner_.set_result_budget(
1013             attr_name.c_str(), *buffer_size, *buffer_var_size));
1014       } else {
1015         RETURN_NOT_OK(read_state_.partitioner_.set_result_budget_nullable(
1016             attr_name.c_str(),
1017             *buffer_size,
1018             *buffer_var_size,
1019             *buffer_validity_size));
1020       }
1021     }
1022   }
1023 
1024   read_state_.unsplittable_ = false;
1025   read_state_.overflowed_ = false;
1026   copy_overflowed_ = false;
1027   read_state_.initialized_ = true;
1028 
1029   return Status::Ok();
1030 }
1031 
sort_result_coords(std::vector<ResultCoords>::iterator iter_begin,std::vector<ResultCoords>::iterator iter_end,size_t coords_num,Layout layout) const1032 Status Reader::sort_result_coords(
1033     std::vector<ResultCoords>::iterator iter_begin,
1034     std::vector<ResultCoords>::iterator iter_end,
1035     size_t coords_num,
1036     Layout layout) const {
1037   auto timer_se = stats_->start_timer("sort_result_coords");
1038   auto domain = array_schema_->domain();
1039 
1040   if (layout == Layout::ROW_MAJOR) {
1041     parallel_sort(
1042         storage_manager_->compute_tp(), iter_begin, iter_end, RowCmp(domain));
1043   } else if (layout == Layout::COL_MAJOR) {
1044     parallel_sort(
1045         storage_manager_->compute_tp(), iter_begin, iter_end, ColCmp(domain));
1046   } else if (layout == Layout::GLOBAL_ORDER) {
1047     if (array_schema_->cell_order() == Layout::HILBERT) {
1048       std::vector<std::pair<uint64_t, uint64_t>> hilbert_values(coords_num);
1049       RETURN_NOT_OK(calculate_hilbert_values(iter_begin, &hilbert_values));
1050       parallel_sort(
1051           storage_manager_->compute_tp(),
1052           hilbert_values.begin(),
1053           hilbert_values.end(),
1054           HilbertCmp(domain, iter_begin));
1055       RETURN_NOT_OK(reorganize_result_coords(iter_begin, &hilbert_values));
1056     } else {
1057       parallel_sort(
1058           storage_manager_->compute_tp(),
1059           iter_begin,
1060           iter_end,
1061           GlobalCmp(domain));
1062     }
1063   } else {
1064     assert(false);
1065   }
1066 
1067   return Status::Ok();
1068 }
1069 
sparse_read()1070 Status Reader::sparse_read() {
1071   // Compute result coordinates from the sparse fragments
1072   // `sparse_result_tiles` will hold all the relevant result tiles of
1073   // sparse fragments
1074   std::vector<ResultCoords> result_coords;
1075   std::vector<ResultTile> sparse_result_tiles;
1076 
1077   RETURN_NOT_OK(compute_result_coords(&sparse_result_tiles, &result_coords));
1078   std::vector<ResultTile*> result_tiles;
1079   for (auto& srt : sparse_result_tiles)
1080     result_tiles.push_back(&srt);
1081 
1082   // Compute result cell slabs
1083   std::vector<ResultCellSlab> result_cell_slabs;
1084   RETURN_CANCEL_OR_ERROR(
1085       compute_result_cell_slabs(result_coords, &result_cell_slabs));
1086   result_coords.clear();
1087 
1088   apply_query_condition(
1089       &result_cell_slabs, &result_tiles, read_state_.partitioner_.subarray());
1090   get_result_tile_stats(result_tiles);
1091   get_result_cell_stats(result_cell_slabs);
1092 
1093   RETURN_NOT_OK(copy_coordinates(&result_tiles, &result_cell_slabs));
1094   RETURN_NOT_OK(copy_attribute_values(
1095       UINT64_MAX,
1096       &result_tiles,
1097       &result_cell_slabs,
1098       *read_state_.partitioner_.subarray()));
1099   read_state_.overflowed_ = copy_overflowed_;
1100 
1101   return Status::Ok();
1102 }
1103 
add_extra_offset()1104 Status Reader::add_extra_offset() {
1105   for (const auto& it : buffers_) {
1106     const auto& name = it.first;
1107     if (!array_schema_->var_size(name))
1108       continue;
1109 
1110     // Do not apply offset for empty results because we will
1111     // write backwards and corrupt memory we don't own.
1112     if (*it.second.buffer_size_ == 0)
1113       continue;
1114 
1115     // The buffer should always be 0 or divisible by the bytesize.
1116     assert(!(*it.second.buffer_size_ < offsets_bytesize()));
1117 
1118     auto buffer = static_cast<unsigned char*>(it.second.buffer_);
1119     if (offsets_format_mode_ == "bytes") {
1120       memcpy(
1121           buffer + *it.second.buffer_size_ - offsets_bytesize(),
1122           it.second.buffer_var_size_,
1123           offsets_bytesize());
1124     } else if (offsets_format_mode_ == "elements") {
1125       auto elements = *it.second.buffer_var_size_ /
1126                       datatype_size(array_schema_->type(name));
1127       memcpy(
1128           buffer + *it.second.buffer_size_ - offsets_bytesize(),
1129           &elements,
1130           offsets_bytesize());
1131     } else {
1132       return logger_->status(Status::ReaderError(
1133           "Cannot add extra offset to buffer; Unsupported offsets format"));
1134     }
1135   }
1136 
1137   return Status::Ok();
1138 }
1139 
sparse_tile_overwritten(unsigned frag_idx,uint64_t tile_idx) const1140 bool Reader::sparse_tile_overwritten(
1141     unsigned frag_idx, uint64_t tile_idx) const {
1142   const auto& mbr = fragment_metadata_[frag_idx]->mbr(tile_idx);
1143   assert(!mbr.empty());
1144   auto fragment_num = (unsigned)fragment_metadata_.size();
1145   auto domain = array_schema_->domain();
1146 
1147   for (unsigned f = frag_idx + 1; f < fragment_num; ++f) {
1148     if (fragment_metadata_[f]->dense() &&
1149         domain->covered(mbr, fragment_metadata_[f]->non_empty_domain()))
1150       return true;
1151   }
1152 
1153   return false;
1154 }
1155 
erase_coord_tiles(std::vector<ResultTile> * result_tiles) const1156 void Reader::erase_coord_tiles(std::vector<ResultTile>* result_tiles) const {
1157   for (auto& tile : *result_tiles) {
1158     auto dim_num = array_schema_->dim_num();
1159     for (unsigned d = 0; d < dim_num; ++d)
1160       tile.erase_tile(array_schema_->dimension(d)->name());
1161     tile.erase_tile(constants::coords);
1162   }
1163 }
1164 
get_result_cell_stats(const std::vector<ResultCellSlab> & result_cell_slabs) const1165 void Reader::get_result_cell_stats(
1166     const std::vector<ResultCellSlab>& result_cell_slabs) const {
1167   uint64_t result_num = 0;
1168   for (const auto& rc : result_cell_slabs)
1169     result_num += rc.length_;
1170   stats_->add_counter("result_num", result_num);
1171 }
1172 
get_result_tile_stats(const std::vector<ResultTile * > & result_tiles) const1173 void Reader::get_result_tile_stats(
1174     const std::vector<ResultTile*>& result_tiles) const {
1175   stats_->add_counter("overlap_tile_num", result_tiles.size());
1176 
1177   uint64_t cell_num = 0;
1178   for (const auto& rt : result_tiles) {
1179     if (!fragment_metadata_[rt->frag_idx()]->dense())
1180       cell_num += rt->cell_num();
1181     else
1182       cell_num += array_schema_->domain()->cell_num_per_tile();
1183   }
1184   stats_->add_counter("cell_num", cell_num);
1185 }
1186 
calculate_hilbert_values(std::vector<ResultCoords>::iterator iter_begin,std::vector<std::pair<uint64_t,uint64_t>> * hilbert_values) const1187 Status Reader::calculate_hilbert_values(
1188     std::vector<ResultCoords>::iterator iter_begin,
1189     std::vector<std::pair<uint64_t, uint64_t>>* hilbert_values) const {
1190   auto timer_se = stats_->start_timer("calculate_hilbert_values");
1191   auto dim_num = array_schema_->dim_num();
1192   Hilbert h(dim_num);
1193   auto bits = h.bits();
1194   auto max_bucket_val = ((uint64_t)1 << bits) - 1;
1195   auto coords_num = (uint64_t)hilbert_values->size();
1196 
1197   // Calculate Hilbert values in parallel
1198   auto status = parallel_for(
1199       storage_manager_->compute_tp(), 0, coords_num, [&](uint64_t c) {
1200         std::vector<uint64_t> coords(dim_num);
1201         for (uint32_t d = 0; d < dim_num; ++d) {
1202           auto dim = array_schema_->dimension(d);
1203           coords[d] =
1204               dim->map_to_uint64(*(iter_begin + c), d, bits, max_bucket_val);
1205         }
1206         (*hilbert_values)[c] =
1207             std::pair<uint64_t, uint64_t>(h.coords_to_hilbert(&coords[0]), c);
1208         return Status::Ok();
1209       });
1210 
1211   RETURN_NOT_OK_ELSE(status, logger_->status(status));
1212 
1213   return Status::Ok();
1214 }
1215 
reorganize_result_coords(std::vector<ResultCoords>::iterator iter_begin,std::vector<std::pair<uint64_t,uint64_t>> * hilbert_values) const1216 Status Reader::reorganize_result_coords(
1217     std::vector<ResultCoords>::iterator iter_begin,
1218     std::vector<std::pair<uint64_t, uint64_t>>* hilbert_values) const {
1219   auto timer_se = stats_->start_timer("reorganize_result_coords");
1220   auto coords_num = hilbert_values->size();
1221   size_t i_src, i_dst;
1222   ResultCoords pending;
1223   for (size_t i_dst_first = 0; i_dst_first < coords_num; ++i_dst_first) {
1224     // Check if this element needs to be permuted
1225     i_src = (*hilbert_values)[i_dst_first].second;
1226     if (i_src == i_dst_first)
1227       continue;
1228 
1229     i_dst = i_dst_first;
1230     pending = std::move(*(iter_begin + i_dst));
1231 
1232     // Follow the permutation cycle
1233     do {
1234       *(iter_begin + i_dst) = std::move(*(iter_begin + i_src));
1235       (*hilbert_values)[i_dst].second = i_dst;
1236 
1237       i_dst = i_src;
1238       i_src = (*hilbert_values)[i_src].second;
1239     } while (i_src != i_dst_first);
1240 
1241     *(iter_begin + i_dst) = std::move(pending);
1242     (*hilbert_values)[i_dst].second = i_dst;
1243   }
1244 
1245   return Status::Ok();
1246 }
1247 
belong_to_single_fragment(std::vector<ResultCoords>::iterator iter_begin,std::vector<ResultCoords>::iterator iter_end) const1248 bool Reader::belong_to_single_fragment(
1249     std::vector<ResultCoords>::iterator iter_begin,
1250     std::vector<ResultCoords>::iterator iter_end) const {
1251   if (iter_begin == iter_end)
1252     return true;
1253 
1254   uint32_t last_frag_idx = iter_begin->tile_->frag_idx();
1255   for (auto it = iter_begin + 1; it != iter_end; ++it) {
1256     if (it->tile_->frag_idx() != last_frag_idx)
1257       return false;
1258   }
1259 
1260   return true;
1261 }
1262 
1263 }  // namespace sm
1264 }  // namespace tiledb
1265