1 /**
2  * @file   reader_base.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file implements class ReaderBase.
31  */
32 
33 #include "tiledb/sm/query/reader_base.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array/array.h"
36 #include "tiledb/sm/array_schema/array_schema.h"
37 #include "tiledb/sm/filesystem/vfs.h"
38 #include "tiledb/sm/fragment/fragment_metadata.h"
39 #include "tiledb/sm/misc/parallel_functions.h"
40 #include "tiledb/sm/query/query_macros.h"
41 #include "tiledb/sm/query/strategy_base.h"
42 #include "tiledb/sm/subarray/cell_slab_iter.h"
43 #include "tiledb/sm/subarray/subarray.h"
44 
45 namespace tiledb {
46 namespace sm {
47 
48 /* ****************************** */
49 /*          CONSTRUCTORS          */
50 /* ****************************** */
51 
ReaderBase(stats::Stats * stats,tdb_shared_ptr<Logger> logger,StorageManager * storage_manager,Array * array,Config & config,std::unordered_map<std::string,QueryBuffer> & buffers,Subarray & subarray,Layout layout,QueryCondition & condition)52 ReaderBase::ReaderBase(
53     stats::Stats* stats,
54     tdb_shared_ptr<Logger> logger,
55     StorageManager* storage_manager,
56     Array* array,
57     Config& config,
58     std::unordered_map<std::string, QueryBuffer>& buffers,
59     Subarray& subarray,
60     Layout layout,
61     QueryCondition& condition)
62     : StrategyBase(
63           stats,
64           logger,
65           storage_manager,
66           array,
67           config,
68           buffers,
69           subarray,
70           layout)
71     , condition_(condition)
72     , fix_var_sized_overflows_(false)
73     , clear_coords_tiles_on_copy_(true)
74     , copy_overflowed_(false) {
75   if (array != nullptr)
76     fragment_metadata_ = array->fragment_metadata();
77 
78   auto uint64_t_max = std::numeric_limits<uint64_t>::max();
79   copy_end_ = std::pair<uint64_t, uint64_t>(uint64_t_max, uint64_t_max);
80 }
81 
82 /* ********************************* */
83 /*          STATIC FUNCTIONS         */
84 /* ********************************* */
85 
86 template <class T>
compute_result_space_tiles(const Domain * domain,const std::vector<std::vector<uint8_t>> & tile_coords,const TileDomain<T> & array_tile_domain,const std::vector<TileDomain<T>> & frag_tile_domains,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles)87 void ReaderBase::compute_result_space_tiles(
88     const Domain* domain,
89     const std::vector<std::vector<uint8_t>>& tile_coords,
90     const TileDomain<T>& array_tile_domain,
91     const std::vector<TileDomain<T>>& frag_tile_domains,
92     std::map<const T*, ResultSpaceTile<T>>* result_space_tiles) {
93   auto fragment_num = (unsigned)frag_tile_domains.size();
94   auto dim_num = array_tile_domain.dim_num();
95   std::vector<T> start_coords;
96   const T* coords;
97   start_coords.resize(dim_num);
98 
99   // For all tile coordinates
100   for (const auto& tc : tile_coords) {
101     coords = (T*)(&(tc[0]));
102     start_coords = array_tile_domain.start_coords(coords);
103 
104     // Create result space tile and insert into the map
105     auto r = result_space_tiles->emplace(coords, ResultSpaceTile<T>());
106     auto& result_space_tile = r.first->second;
107     result_space_tile.set_start_coords(start_coords);
108 
109     // Add fragment info to the result space tile
110     for (unsigned f = 0; f < fragment_num; ++f) {
111       // Check if the fragment overlaps with the space tile
112       if (!frag_tile_domains[f].in_tile_domain(coords))
113         continue;
114 
115       // Check if any previous fragment covers this fragment
116       // for the tile identified by `coords`
117       bool covered = false;
118       for (unsigned j = 0; j < f; ++j) {
119         if (frag_tile_domains[j].covers(coords, frag_tile_domains[f])) {
120           covered = true;
121           break;
122         }
123       }
124 
125       // Exclude this fragment from the space tile
126       if (covered)
127         continue;
128 
129       // Include this fragment in the space tile
130       auto frag_domain = frag_tile_domains[f].domain_slice();
131       auto frag_idx = frag_tile_domains[f].id();
132       result_space_tile.append_frag_domain(frag_idx, frag_domain);
133       auto tile_idx = frag_tile_domains[f].tile_pos(coords);
134       ResultTile result_tile(frag_idx, tile_idx, domain);
135       result_space_tile.set_result_tile(frag_idx, result_tile);
136     }
137   }
138 }
139 
140 /* ****************************** */
141 /*        PROTECTED METHODS       */
142 /* ****************************** */
143 
clear_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles) const144 void ReaderBase::clear_tiles(
145     const std::string& name,
146     const std::vector<ResultTile*>* result_tiles) const {
147   for (auto& result_tile : *result_tiles)
148     result_tile->erase_tile(name);
149 }
150 
reset_buffer_sizes()151 void ReaderBase::reset_buffer_sizes() {
152   for (auto& it : buffers_) {
153     *(it.second.buffer_size_) = it.second.original_buffer_size_;
154     if (it.second.buffer_var_size_ != nullptr)
155       *(it.second.buffer_var_size_) = it.second.original_buffer_var_size_;
156     if (it.second.validity_vector_.buffer_size() != nullptr)
157       *(it.second.validity_vector_.buffer_size()) =
158           it.second.original_validity_vector_size_;
159   }
160 }
161 
zero_out_buffer_sizes()162 void ReaderBase::zero_out_buffer_sizes() {
163   for (auto& buffer : buffers_) {
164     if (buffer.second.buffer_size_ != nullptr)
165       *(buffer.second.buffer_size_) = 0;
166     if (buffer.second.buffer_var_size_ != nullptr)
167       *(buffer.second.buffer_var_size_) = 0;
168     if (buffer.second.validity_vector_.buffer_size() != nullptr)
169       *(buffer.second.validity_vector_.buffer_size()) = 0;
170   }
171 }
172 
check_subarray() const173 Status ReaderBase::check_subarray() const {
174   if (subarray_.layout() == Layout::GLOBAL_ORDER && subarray_.range_num() != 1)
175     return logger_->status(Status::ReaderError(
176         "Cannot initialize reader; Multi-range subarrays with "
177         "global order layout are not supported"));
178 
179   return Status::Ok();
180 }
181 
check_validity_buffer_sizes() const182 Status ReaderBase::check_validity_buffer_sizes() const {
183   // Verify that the validity buffer size for each
184   // nullable attribute is large enough to contain
185   // a validity value for each cell.
186   for (const auto& it : buffers_) {
187     const std::string& name = it.first;
188     if (array_schema_->is_nullable(name)) {
189       const uint64_t buffer_size = *it.second.buffer_size_;
190 
191       uint64_t min_cell_num = 0;
192       if (array_schema_->var_size(name)) {
193         min_cell_num = buffer_size / constants::cell_var_offset_size;
194 
195         // If the offsets buffer contains an extra element to mark
196         // the offset to the end of the data buffer, we do not need
197         // a validity value for that extra offset.
198         if (offsets_extra_element_)
199           min_cell_num = std::min<uint64_t>(0, min_cell_num - 1);
200       } else {
201         min_cell_num = buffer_size / array_schema_->cell_size(name);
202       }
203 
204       const uint64_t buffer_validity_size =
205           *it.second.validity_vector_.buffer_size();
206       const uint64_t cell_validity_num =
207           buffer_validity_size / constants::cell_validity_size;
208 
209       if (cell_validity_num < min_cell_num) {
210         std::stringstream ss;
211         ss << "Buffer sizes check failed; Invalid number of validity cells "
212               "given for ";
213         ss << "attribute '" << name << "'";
214         ss << " (" << cell_validity_num << " < " << min_cell_num << ")";
215         return logger_->status(Status::ReaderError(ss.str()));
216       }
217     }
218   }
219 
220   return Status::Ok();
221 }
222 
load_tile_offsets(Subarray * subarray,const std::vector<std::string> * names)223 Status ReaderBase::load_tile_offsets(
224     Subarray* subarray, const std::vector<std::string>* names) {
225   auto timer_se = stats_->start_timer("load_tile_offsets");
226   const auto encryption_key = array_->encryption_key();
227 
228   // Fetch relevant fragments so we load tile offsets only from intersecting
229   // fragments
230   const auto relevant_fragments = subarray->relevant_fragments();
231 
232   bool all_frag = !subarray->is_set();
233 
234   const auto status = parallel_for(
235       storage_manager_->compute_tp(),
236       0,
237       all_frag ? fragment_metadata_.size() : relevant_fragments->size(),
238       [&](const uint64_t i) {
239         auto frag_idx = all_frag ? i : relevant_fragments->at(i);
240         auto& fragment = fragment_metadata_[frag_idx];
241         const auto format_version = fragment->format_version();
242 
243         // Filter the 'names' for format-specific names.
244         std::vector<std::string> filtered_names;
245         filtered_names.reserve(names->size());
246         auto schema = fragment->array_schema();
247         for (const auto& name : *names) {
248           // Applicable for zipped coordinates only to versions < 5
249           if (name == constants::coords && format_version >= 5)
250             continue;
251 
252           // Applicable to separate coordinates only to versions >= 5
253           const auto is_dim = schema->is_dim(name);
254           if (is_dim && format_version < 5)
255             continue;
256 
257           // Not a member of array schema, this field was added in array schema
258           // evolution, ignore for this fragment's tile offsets
259           if (!schema->is_field(name))
260             continue;
261 
262           filtered_names.emplace_back(name);
263         }
264 
265         RETURN_NOT_OK(fragment->load_tile_offsets(
266             *encryption_key, std::move(filtered_names)));
267         return Status::Ok();
268       });
269 
270   RETURN_NOT_OK(status);
271 
272   return Status::Ok();
273 }
274 
load_tile_var_sizes(Subarray * subarray,const std::vector<std::string> * names)275 Status ReaderBase::load_tile_var_sizes(
276     Subarray* subarray, const std::vector<std::string>* names) {
277   auto timer_se = stats_->start_timer("load_tile_var_sizes");
278   const auto encryption_key = array_->encryption_key();
279 
280   // Fetch relevant fragments so we load tile var sizes only from intersecting
281   // fragments
282   const auto relevant_fragments = subarray->relevant_fragments();
283 
284   bool all_frag = !subarray->is_set();
285 
286   const auto status = parallel_for(
287       storage_manager_->compute_tp(),
288       0,
289       all_frag ? fragment_metadata_.size() : relevant_fragments->size(),
290       [&](const uint64_t i) {
291         auto frag_idx = all_frag ? i : relevant_fragments->at(i);
292         auto& fragment = fragment_metadata_[frag_idx];
293 
294         auto schema = fragment->array_schema();
295         for (const auto& name : *names) {
296           // Not a var size attribute.
297           if (!schema->var_size(name))
298             continue;
299 
300           // Not a member of array schema, this field was added in array schema
301           // evolution, ignore for this fragment's tile var sizes.
302           if (!schema->is_field(name))
303             continue;
304 
305           fragment->load_tile_var_sizes(*encryption_key, name);
306         }
307 
308         return Status::Ok();
309       });
310 
311   RETURN_NOT_OK(status);
312 
313   return Status::Ok();
314 }
315 
init_tile(uint32_t format_version,const std::string & name,Tile * tile) const316 Status ReaderBase::init_tile(
317     uint32_t format_version, const std::string& name, Tile* tile) const {
318   // For easy reference
319   auto cell_size = array_schema_->cell_size(name);
320   auto type = array_schema_->type(name);
321   auto is_coords = (name == constants::coords);
322   auto dim_num = (is_coords) ? array_schema_->dim_num() : 0;
323 
324   // Initialize
325   RETURN_NOT_OK(tile->init_filtered(format_version, type, cell_size, dim_num));
326 
327   return Status::Ok();
328 }
329 
init_tile(uint32_t format_version,const std::string & name,Tile * tile,Tile * tile_var) const330 Status ReaderBase::init_tile(
331     uint32_t format_version,
332     const std::string& name,
333     Tile* tile,
334     Tile* tile_var) const {
335   // For easy reference
336   auto type = array_schema_->type(name);
337 
338   // Initialize
339   RETURN_NOT_OK(tile->init_filtered(
340       format_version,
341       constants::cell_var_offset_type,
342       constants::cell_var_offset_size,
343       0));
344   RETURN_NOT_OK(
345       tile_var->init_filtered(format_version, type, datatype_size(type), 0));
346   return Status::Ok();
347 }
348 
init_tile_nullable(uint32_t format_version,const std::string & name,Tile * tile,Tile * tile_validity) const349 Status ReaderBase::init_tile_nullable(
350     uint32_t format_version,
351     const std::string& name,
352     Tile* tile,
353     Tile* tile_validity) const {
354   // For easy reference
355   auto cell_size = array_schema_->cell_size(name);
356   auto type = array_schema_->type(name);
357   auto is_coords = (name == constants::coords);
358   auto dim_num = (is_coords) ? array_schema_->dim_num() : 0;
359 
360   // Initialize
361   RETURN_NOT_OK(tile->init_filtered(format_version, type, cell_size, dim_num));
362   RETURN_NOT_OK(tile_validity->init_filtered(
363       format_version,
364       constants::cell_validity_type,
365       constants::cell_validity_size,
366       0));
367 
368   return Status::Ok();
369 }
370 
init_tile_nullable(uint32_t format_version,const std::string & name,Tile * tile,Tile * tile_var,Tile * tile_validity) const371 Status ReaderBase::init_tile_nullable(
372     uint32_t format_version,
373     const std::string& name,
374     Tile* tile,
375     Tile* tile_var,
376     Tile* tile_validity) const {
377   // For easy reference
378   auto type = array_schema_->type(name);
379 
380   // Initialize
381   RETURN_NOT_OK(tile->init_filtered(
382       format_version,
383       constants::cell_var_offset_type,
384       constants::cell_var_offset_size,
385       0));
386   RETURN_NOT_OK(
387       tile_var->init_filtered(format_version, type, datatype_size(type), 0));
388   RETURN_NOT_OK(tile_validity->init_filtered(
389       format_version,
390       constants::cell_validity_type,
391       constants::cell_validity_size,
392       0));
393   return Status::Ok();
394 }
395 
read_attribute_tiles(const std::vector<std::string> * names,const std::vector<ResultTile * > * result_tiles) const396 Status ReaderBase::read_attribute_tiles(
397     const std::vector<std::string>* names,
398     const std::vector<ResultTile*>* result_tiles) const {
399   auto timer_se = stats_->start_timer("attr_tiles");
400   return read_tiles(names, result_tiles);
401 }
402 
read_coordinate_tiles(const std::vector<std::string> * names,const std::vector<ResultTile * > * result_tiles) const403 Status ReaderBase::read_coordinate_tiles(
404     const std::vector<std::string>* names,
405     const std::vector<ResultTile*>* result_tiles) const {
406   auto timer_se = stats_->start_timer("coord_tiles");
407   return read_tiles(names, result_tiles);
408 }
409 
read_tiles(const std::vector<std::string> * names,const std::vector<ResultTile * > * result_tiles) const410 Status ReaderBase::read_tiles(
411     const std::vector<std::string>* names,
412     const std::vector<ResultTile*>* result_tiles) const {
413   // Shortcut for empty tile vec
414   if (result_tiles->empty())
415     return Status::Ok();
416 
417   // Reading tiles are thread safe. However, we will perform
418   // them on this thread if there is only one read to perform.
419   if (names->size() == 1) {
420     RETURN_NOT_OK(read_tiles(names->at(0), result_tiles));
421   } else {
422     const auto status = parallel_for(
423         storage_manager_->compute_tp(),
424         0,
425         names->size(),
426         [&](const uint64_t i) {
427           RETURN_NOT_OK(read_tiles(names->at(i), result_tiles));
428           return Status::Ok();
429         });
430 
431     RETURN_NOT_OK(status);
432   }
433 
434   return Status::Ok();
435 }
436 
read_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles) const437 Status ReaderBase::read_tiles(
438     const std::string& name,
439     const std::vector<ResultTile*>* result_tiles) const {
440   // Shortcut for empty tile vec
441   if (result_tiles->empty())
442     return Status::Ok();
443 
444   // Read the tiles asynchronously
445   std::vector<ThreadPool::Task> tasks;
446   RETURN_CANCEL_OR_ERROR(read_tiles(name, result_tiles, &tasks));
447 
448   // Wait for the reads to finish and check statuses.
449   auto statuses = storage_manager_->io_tp()->wait_all_status(tasks);
450   for (const auto& st : statuses)
451     RETURN_CANCEL_OR_ERROR(st);
452 
453   return Status::Ok();
454 }
455 
read_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles,std::vector<ThreadPool::Task> * const tasks) const456 Status ReaderBase::read_tiles(
457     const std::string& name,
458     const std::vector<ResultTile*>* result_tiles,
459     std::vector<ThreadPool::Task>* const tasks) const {
460   // Shortcut for empty tile vec
461   if (result_tiles->empty())
462     return Status::Ok();
463 
464   // For each tile, read from its fragment.
465   const bool var_size = array_schema_->var_size(name);
466   const bool nullable = array_schema_->is_nullable(name);
467 
468   // Gather the unique fragments indexes for which there are tiles
469   std::unordered_set<uint32_t> fragment_idxs_set;
470   for (const auto& tile : *result_tiles)
471     fragment_idxs_set.emplace(tile->frag_idx());
472 
473   // Put fragment indexes in a vector
474   std::vector<uint32_t> fragment_idxs_vec;
475   fragment_idxs_vec.reserve(fragment_idxs_set.size());
476   for (const auto& idx : fragment_idxs_set)
477     fragment_idxs_vec.emplace_back(idx);
478 
479   // Protect all elements within `result_tiles`.
480   std::unique_lock<std::mutex> ul(result_tiles_mutex_);
481 
482   // Populate the list of regions per file to be read.
483   std::map<URI, std::vector<std::tuple<uint64_t, void*, uint64_t>>> all_regions;
484   for (const auto& tile : *result_tiles) {
485     auto const fragment = fragment_metadata_[tile->frag_idx()];
486     const uint32_t format_version = fragment->format_version();
487 
488     // Applicable for zipped coordinates only to versions < 5
489     if (name == constants::coords && format_version >= 5)
490       continue;
491 
492     // Applicable to separate coordinates only to versions >= 5
493     const bool is_dim = array_schema_->is_dim(name);
494     if (is_dim && format_version < 5)
495       continue;
496 
497     // If the fragment doesn't have the attribute, this is a schema evolution
498     // field and will be treated with fill-in value instead of reading from disk
499     if (!fragment->array_schema()->is_field(name))
500       continue;
501 
502     // Initialize the tile(s)
503     if (is_dim) {
504       const uint64_t dim_num = array_schema_->dim_num();
505       for (uint64_t d = 0; d < dim_num; ++d) {
506         if (array_schema_->dimension(d)->name() == name) {
507           tile->init_coord_tile(name, d);
508           break;
509         }
510       }
511     } else {
512       tile->init_attr_tile(name);
513     }
514 
515     ResultTile::TileTuple* const tile_tuple = tile->tile_tuple(name);
516     assert(tile_tuple != nullptr);
517     Tile* const t = &std::get<0>(*tile_tuple);
518     Tile* const t_var = &std::get<1>(*tile_tuple);
519     Tile* const t_validity = &std::get<2>(*tile_tuple);
520     if (!var_size) {
521       if (nullable)
522         RETURN_NOT_OK(init_tile_nullable(format_version, name, t, t_validity));
523       else
524         RETURN_NOT_OK(init_tile(format_version, name, t));
525     } else {
526       if (nullable)
527         RETURN_NOT_OK(
528             init_tile_nullable(format_version, name, t, t_var, t_validity));
529       else
530         RETURN_NOT_OK(init_tile(format_version, name, t, t_var));
531     }
532 
533     // Get information about the tile in its fragment
534     auto tile_attr_uri = fragment->uri(name);
535     auto tile_idx = tile->tile_idx();
536     uint64_t tile_attr_offset;
537     RETURN_NOT_OK(fragment->file_offset(name, tile_idx, &tile_attr_offset));
538     uint64_t tile_persisted_size;
539     RETURN_NOT_OK(
540         fragment->persisted_tile_size(name, tile_idx, &tile_persisted_size));
541 
542     // Try the cache first.
543     // TODO Parallelize.
544     bool cache_hit;
545     RETURN_NOT_OK(storage_manager_->read_from_cache(
546         tile_attr_uri,
547         tile_attr_offset,
548         t->filtered_buffer(),
549         tile_persisted_size,
550         &cache_hit));
551 
552     if (!cache_hit) {
553       // Add the region of the fragment to be read.
554       RETURN_NOT_OK(t->filtered_buffer()->realloc(tile_persisted_size));
555       t->filtered_buffer()->set_size(tile_persisted_size);
556       t->filtered_buffer()->reset_offset();
557       all_regions[tile_attr_uri].emplace_back(
558           tile_attr_offset, t->filtered_buffer()->data(), tile_persisted_size);
559     }
560 
561     if (var_size) {
562       auto tile_attr_var_uri = fragment->var_uri(name);
563       uint64_t tile_attr_var_offset;
564       RETURN_NOT_OK(
565           fragment->file_var_offset(name, tile_idx, &tile_attr_var_offset));
566       uint64_t tile_var_persisted_size;
567       RETURN_NOT_OK(fragment->persisted_tile_var_size(
568           name, tile_idx, &tile_var_persisted_size));
569 
570       Buffer cached_var_buffer;
571       RETURN_NOT_OK(storage_manager_->read_from_cache(
572           tile_attr_var_uri,
573           tile_attr_var_offset,
574           t_var->filtered_buffer(),
575           tile_var_persisted_size,
576           &cache_hit));
577 
578       if (!cache_hit) {
579         // Add the region of the fragment to be read.
580         RETURN_NOT_OK(
581             t_var->filtered_buffer()->realloc(tile_var_persisted_size));
582         t_var->filtered_buffer()->set_size(tile_var_persisted_size);
583         t_var->filtered_buffer()->reset_offset();
584         all_regions[tile_attr_var_uri].emplace_back(
585             tile_attr_var_offset,
586             t_var->filtered_buffer()->data(),
587             tile_var_persisted_size);
588       }
589     }
590 
591     if (nullable) {
592       auto tile_validity_attr_uri = fragment->validity_uri(name);
593       uint64_t tile_attr_validity_offset;
594       RETURN_NOT_OK(fragment->file_validity_offset(
595           name, tile_idx, &tile_attr_validity_offset));
596       uint64_t tile_validity_persisted_size;
597       RETURN_NOT_OK(fragment->persisted_tile_validity_size(
598           name, tile_idx, &tile_validity_persisted_size));
599 
600       Buffer cached_valdity_buffer;
601       RETURN_NOT_OK(storage_manager_->read_from_cache(
602           tile_validity_attr_uri,
603           tile_attr_validity_offset,
604           t_validity->filtered_buffer(),
605           tile_validity_persisted_size,
606           &cache_hit));
607 
608       if (!cache_hit) {
609         // Add the region of the fragment to be read.
610         RETURN_NOT_OK(t_validity->filtered_buffer()->realloc(
611             tile_validity_persisted_size));
612         t_validity->filtered_buffer()->set_size(tile_validity_persisted_size);
613         t_validity->filtered_buffer()->reset_offset();
614         all_regions[tile_validity_attr_uri].emplace_back(
615             tile_attr_validity_offset,
616             t_validity->filtered_buffer()->data(),
617             tile_validity_persisted_size);
618       }
619     }
620   }
621 
622   // We're done accessing elements within `result_tiles`.
623   ul.unlock();
624 
625   // Do not use the read-ahead cache because tiles will be
626   // cached in the tile cache.
627   const bool use_read_ahead = false;
628 
629   // Enqueue all regions to be read.
630   for (const auto& item : all_regions) {
631     RETURN_NOT_OK(storage_manager_->vfs()->read_all(
632         item.first,
633         item.second,
634         storage_manager_->io_tp(),
635         tasks,
636         use_read_ahead));
637   }
638 
639   return Status::Ok();
640 }
641 
unfilter_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles) const642 Status ReaderBase::unfilter_tiles(
643     const std::string& name,
644     const std::vector<ResultTile*>* result_tiles) const {
645   auto stat_type = (array_schema_->is_attr(name)) ? "unfilter_attr_tiles" :
646                                                     "unfilter_coord_tiles";
647   auto timer_se = stats_->start_timer(stat_type);
648 
649   auto var_size = array_schema_->var_size(name);
650   auto nullable = array_schema_->is_nullable(name);
651   auto num_tiles = static_cast<uint64_t>(result_tiles->size());
652 
653   auto status = parallel_for(
654       storage_manager_->compute_tp(), 0, num_tiles, [&, this](uint64_t i) {
655         ResultTile* const tile = result_tiles->at(i);
656 
657         auto& fragment = fragment_metadata_[tile->frag_idx()];
658         auto format_version = fragment->format_version();
659 
660         // Applicable for zipped coordinates only to versions < 5
661         // Applicable for separate coordinates only to version >= 5
662         if (name != constants::coords ||
663             (name == constants::coords && format_version < 5) ||
664             (array_schema_->is_dim(name) && format_version >= 5)) {
665           auto tile_tuple = tile->tile_tuple(name);
666 
667           // Skip non-existent attributes/dimensions (e.g. coords in the
668           // dense case).
669           if (tile_tuple == nullptr ||
670               std::get<0>(*tile_tuple).filtered_buffer()->size() == 0)
671             return Status::Ok();
672 
673           // Get information about the tile in its fragment.
674           auto tile_attr_uri = fragment->uri(name);
675           auto tile_idx = tile->tile_idx();
676           uint64_t tile_attr_offset;
677           RETURN_NOT_OK(
678               fragment->file_offset(name, tile_idx, &tile_attr_offset));
679 
680           auto& t = std::get<0>(*tile_tuple);
681           auto& t_var = std::get<1>(*tile_tuple);
682           auto& t_validity = std::get<2>(*tile_tuple);
683 
684           // Cache 't'.
685           if (t.filtered()) {
686             // Store the filtered buffer in the tile cache.
687             RETURN_NOT_OK(storage_manager_->write_to_cache(
688                 tile_attr_uri, tile_attr_offset, t.filtered_buffer()));
689           }
690 
691           // Cache 't_var'.
692           if (var_size && t_var.filtered()) {
693             auto tile_attr_var_uri = fragment->var_uri(name);
694             uint64_t tile_attr_var_offset;
695             RETURN_NOT_OK(fragment->file_var_offset(
696                 name, tile_idx, &tile_attr_var_offset));
697 
698             // Store the filtered buffer in the tile cache.
699             RETURN_NOT_OK(storage_manager_->write_to_cache(
700                 tile_attr_var_uri,
701                 tile_attr_var_offset,
702                 t_var.filtered_buffer()));
703           }
704 
705           // Cache 't_validity'.
706           if (nullable && t_validity.filtered()) {
707             auto tile_attr_validity_uri = fragment->validity_uri(name);
708             uint64_t tile_attr_validity_offset;
709             RETURN_NOT_OK(fragment->file_validity_offset(
710                 name, tile_idx, &tile_attr_validity_offset));
711 
712             // Store the filtered buffer in the tile cache.
713             RETURN_NOT_OK(storage_manager_->write_to_cache(
714                 tile_attr_validity_uri,
715                 tile_attr_validity_offset,
716                 t_validity.filtered_buffer()));
717           }
718 
719           // Unfilter 't' for fixed-sized tiles, otherwise unfilter both 't' and
720           // 't_var' for var-sized tiles.
721           if (!var_size) {
722             if (!nullable)
723               RETURN_NOT_OK(unfilter_tile(name, &t));
724             else
725               RETURN_NOT_OK(unfilter_tile_nullable(name, &t, &t_validity));
726           } else {
727             if (!nullable)
728               RETURN_NOT_OK(unfilter_tile(name, &t, &t_var));
729             else
730               RETURN_NOT_OK(
731                   unfilter_tile_nullable(name, &t, &t_var, &t_validity));
732           }
733         }
734 
735         return Status::Ok();
736       });
737 
738   RETURN_CANCEL_OR_ERROR(status);
739 
740   return Status::Ok();
741 }
742 
unfilter_tile(const std::string & name,Tile * tile) const743 Status ReaderBase::unfilter_tile(const std::string& name, Tile* tile) const {
744   FilterPipeline filters = array_schema_->filters(name);
745 
746   // Append an encryption unfilter when necessary.
747   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
748       &filters, array_->get_encryption_key()));
749 
750   // Reverse the tile filters.
751   RETURN_NOT_OK(filters.run_reverse(
752       stats_,
753       tile,
754       storage_manager_->compute_tp(),
755       storage_manager_->config()));
756 
757   return Status::Ok();
758 }
759 
unfilter_tile(const std::string & name,Tile * tile,Tile * tile_var) const760 Status ReaderBase::unfilter_tile(
761     const std::string& name, Tile* tile, Tile* tile_var) const {
762   FilterPipeline offset_filters = array_schema_->cell_var_offsets_filters();
763   FilterPipeline filters = array_schema_->filters(name);
764 
765   // Append an encryption unfilter when necessary.
766   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
767       &offset_filters, array_->get_encryption_key()));
768   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
769       &filters, array_->get_encryption_key()));
770 
771   // Reverse the tile filters.
772   RETURN_NOT_OK(offset_filters.run_reverse(
773       stats_, tile, storage_manager_->compute_tp(), config_));
774   RETURN_NOT_OK(filters.run_reverse(
775       stats_, tile_var, storage_manager_->compute_tp(), config_));
776 
777   return Status::Ok();
778 }
779 
unfilter_tile_nullable(const std::string & name,Tile * tile,Tile * tile_validity) const780 Status ReaderBase::unfilter_tile_nullable(
781     const std::string& name, Tile* tile, Tile* tile_validity) const {
782   FilterPipeline filters = array_schema_->filters(name);
783   FilterPipeline validity_filters = array_schema_->cell_validity_filters();
784 
785   // Append an encryption unfilter when necessary.
786   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
787       &filters, array_->get_encryption_key()));
788   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
789       &validity_filters, array_->get_encryption_key()));
790 
791   // Reverse the tile filters.
792   RETURN_NOT_OK(filters.run_reverse(
793       stats_,
794       tile,
795       storage_manager_->compute_tp(),
796       storage_manager_->config()));
797 
798   // Reverse the validity tile filters.
799   RETURN_NOT_OK(validity_filters.run_reverse(
800       stats_,
801       tile_validity,
802       storage_manager_->compute_tp(),
803       storage_manager_->config()));
804 
805   return Status::Ok();
806 }
807 
unfilter_tile_nullable(const std::string & name,Tile * tile,Tile * tile_var,Tile * tile_validity) const808 Status ReaderBase::unfilter_tile_nullable(
809     const std::string& name,
810     Tile* tile,
811     Tile* tile_var,
812     Tile* tile_validity) const {
813   FilterPipeline offset_filters = array_schema_->cell_var_offsets_filters();
814   FilterPipeline filters = array_schema_->filters(name);
815   FilterPipeline validity_filters = array_schema_->cell_validity_filters();
816 
817   // Append an encryption unfilter when necessary.
818   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
819       &offset_filters, array_->get_encryption_key()));
820   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
821       &filters, array_->get_encryption_key()));
822   RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
823       &validity_filters, array_->get_encryption_key()));
824 
825   // Reverse the tile filters.
826   RETURN_NOT_OK(offset_filters.run_reverse(
827       stats_,
828       tile,
829       storage_manager_->compute_tp(),
830       storage_manager_->config()));
831   RETURN_NOT_OK(filters.run_reverse(
832       stats_,
833       tile_var,
834       storage_manager_->compute_tp(),
835       storage_manager_->config()));
836 
837   // Reverse the validity tile filters.
838   RETURN_NOT_OK(validity_filters.run_reverse(
839       stats_,
840       tile_validity,
841       storage_manager_->compute_tp(),
842       storage_manager_->config()));
843 
844   return Status::Ok();
845 }
846 
copy_coordinates(const std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs)847 Status ReaderBase::copy_coordinates(
848     const std::vector<ResultTile*>* result_tiles,
849     std::vector<ResultCellSlab>* result_cell_slabs) {
850   auto timer_se = stats_->start_timer("copy_coordinates");
851 
852   if (result_cell_slabs->empty() && result_tiles->empty()) {
853     zero_out_buffer_sizes();
854     return Status::Ok();
855   }
856 
857   const uint64_t stride = UINT64_MAX;
858 
859   // Build a list of coordinate names to copy, separating them by
860   // whether they are of fixed or variable length. The motivation
861   // is that copying fixed and variable cells require two different
862   // cell slab partitions. Processing them separately allows us to
863   // reduce memory use.
864   std::vector<std::string> fixed_names;
865   std::vector<std::string> var_names;
866 
867   for (const auto& it : buffers_) {
868     const auto& name = it.first;
869     if (copy_overflowed_)
870       break;
871     if (!(name == constants::coords || array_schema_->is_dim(name)))
872       continue;
873 
874     if (array_schema_->var_size(name))
875       var_names.emplace_back(name);
876     else
877       fixed_names.emplace_back(name);
878   }
879 
880   // Copy result cells for fixed-sized coordinates.
881   if (!fixed_names.empty()) {
882     std::vector<size_t> fixed_cs_partitions;
883     compute_fixed_cs_partitions(result_cell_slabs, &fixed_cs_partitions);
884 
885     for (const auto& name : fixed_names) {
886       RETURN_CANCEL_OR_ERROR(copy_fixed_cells(
887           name, stride, result_cell_slabs, &fixed_cs_partitions));
888       if (clear_coords_tiles_on_copy_)
889         clear_tiles(name, result_tiles);
890     }
891   }
892 
893   // Copy result cells for var-sized coordinates.
894   if (!var_names.empty()) {
895     std::vector<std::pair<size_t, size_t>> var_cs_partitions;
896     size_t total_var_cs_length;
897     compute_var_cs_partitions(
898         result_cell_slabs, &var_cs_partitions, &total_var_cs_length);
899 
900     for (const auto& name : var_names) {
901       RETURN_CANCEL_OR_ERROR(copy_var_cells(
902           name,
903           stride,
904           result_cell_slabs,
905           &var_cs_partitions,
906           total_var_cs_length));
907       if (clear_coords_tiles_on_copy_)
908         clear_tiles(name, result_tiles);
909     }
910   }
911 
912   return Status::Ok();
913 }
914 
copy_attribute_values(const uint64_t stride,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs,Subarray & subarray,uint64_t memory_budget,bool include_dim)915 Status ReaderBase::copy_attribute_values(
916     const uint64_t stride,
917     std::vector<ResultTile*>* result_tiles,
918     std::vector<ResultCellSlab>* result_cell_slabs,
919     Subarray& subarray,
920     uint64_t memory_budget,
921     bool include_dim) {
922   auto timer_se = stats_->start_timer("copy_attr_values");
923 
924   if (result_cell_slabs->empty() && result_tiles->empty()) {
925     zero_out_buffer_sizes();
926     return Status::Ok();
927   }
928 
929   const std::unordered_set<std::string>& condition_names =
930       condition_.field_names();
931 
932   // Build a set of attribute names to process.
933   std::unordered_map<std::string, ProcessTileFlags> names;
934   for (const auto& it : buffers_) {
935     const auto& name = it.first;
936 
937     if (copy_overflowed_) {
938       break;
939     }
940 
941     if (!include_dim &&
942         (name == constants::coords || array_schema_->is_dim(name))) {
943       continue;
944     }
945 
946     // If the query condition has a clause for `name`, we will only
947     // flag it to copy because we have already preloaded the offsets
948     // and read the tiles in `apply_query_condition`.
949     ProcessTileFlags flags = ProcessTileFlag::COPY;
950     if (condition_names.count(name) == 0) {
951       flags |= ProcessTileFlag::READ;
952     }
953 
954     names[name] = flags;
955   }
956 
957   if (!names.empty()) {
958     RETURN_NOT_OK(process_tiles(
959         &names,
960         result_tiles,
961         result_cell_slabs,
962         &subarray,
963         stride,
964         memory_budget,
965         nullptr));
966   }
967 
968   return Status::Ok();
969 }
970 
copy_fixed_cells(const std::string & name,uint64_t stride,const std::vector<ResultCellSlab> * result_cell_slabs,std::vector<size_t> * fixed_cs_partitions)971 Status ReaderBase::copy_fixed_cells(
972     const std::string& name,
973     uint64_t stride,
974     const std::vector<ResultCellSlab>* result_cell_slabs,
975     std::vector<size_t>* fixed_cs_partitions) {
976   auto stat_type = (array_schema_->is_attr(name)) ? "copy_fixed_attr_values" :
977                                                     "copy_fixed_coords";
978   auto timer_se = stats_->start_timer(stat_type);
979 
980   if (result_cell_slabs->empty()) {
981     zero_out_buffer_sizes();
982     return Status::Ok();
983   }
984 
985   auto it = buffers_.find(name);
986   auto buffer_size = it->second.buffer_size_;
987   auto cell_size = array_schema_->cell_size(name);
988 
989   // Precompute the cell range destination offsets in the buffer.
990   uint64_t buffer_offset = 0;
991   auto size = std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
992   std::vector<uint64_t> cs_offsets(size);
993   for (uint64_t i = 0; i < cs_offsets.size(); i++) {
994     const auto& cs = result_cell_slabs->at(i);
995 
996     auto cs_length = cs.length_;
997     if (i == copy_end_.first - 1) {
998       cs_length = copy_end_.second;
999     }
1000 
1001     auto bytes_to_copy = cs_length * cell_size;
1002     cs_offsets[i] = buffer_offset;
1003     buffer_offset += bytes_to_copy;
1004   }
1005 
1006   // Handle overflow.
1007   if (buffer_offset > *buffer_size) {
1008     copy_overflowed_ = true;
1009     return Status::Ok();
1010   }
1011 
1012   // Copy result cell slabs in parallel.
1013   std::function<Status(size_t)> copy_fn = std::bind(
1014       &ReaderBase::copy_partitioned_fixed_cells,
1015       this,
1016       std::placeholders::_1,
1017       &name,
1018       stride,
1019       result_cell_slabs,
1020       &cs_offsets,
1021       fixed_cs_partitions);
1022   auto status = parallel_for(
1023       storage_manager_->compute_tp(),
1024       0,
1025       fixed_cs_partitions->size(),
1026       std::move(copy_fn));
1027 
1028   RETURN_NOT_OK(status);
1029 
1030   // Update buffer offsets
1031   *(buffers_[name].buffer_size_) = buffer_offset;
1032   if (array_schema_->is_nullable(name)) {
1033     *(buffers_[name].validity_vector_.buffer_size()) =
1034         (buffer_offset / cell_size) * constants::cell_validity_size;
1035   }
1036 
1037   return Status::Ok();
1038 }
1039 
compute_fixed_cs_partitions(const std::vector<ResultCellSlab> * result_cell_slabs,std::vector<size_t> * fixed_cs_partitions)1040 void ReaderBase::compute_fixed_cs_partitions(
1041     const std::vector<ResultCellSlab>* result_cell_slabs,
1042     std::vector<size_t>* fixed_cs_partitions) {
1043   if (result_cell_slabs->empty()) {
1044     return;
1045   }
1046 
1047   const int num_copy_threads =
1048       storage_manager_->compute_tp()->concurrency_level();
1049 
1050   // Calculate the partition sizes.
1051   auto num_cs = std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
1052   const uint64_t num_cs_partitions =
1053       std::min<uint64_t>(num_copy_threads, num_cs);
1054   const uint64_t cs_per_partition = num_cs / num_cs_partitions;
1055   const uint64_t cs_per_partition_carry = num_cs % num_cs_partitions;
1056 
1057   // Calculate the partition offsets.
1058   uint64_t num_cs_partitioned = 0;
1059   fixed_cs_partitions->reserve(num_cs_partitions);
1060   for (uint64_t i = 0; i < num_cs_partitions; ++i) {
1061     const uint64_t num_cs_in_partition =
1062         cs_per_partition + ((i < cs_per_partition_carry) ? 1 : 0);
1063     num_cs_partitioned += num_cs_in_partition;
1064     fixed_cs_partitions->emplace_back(num_cs_partitioned);
1065   }
1066 }
1067 
offsets_bytesize() const1068 uint64_t ReaderBase::offsets_bytesize() const {
1069   assert(offsets_bitsize_ == 32 || offsets_bitsize_ == 64);
1070   return offsets_bitsize_ == 32 ? sizeof(uint32_t) :
1071                                   constants::cell_var_offset_size;
1072 }
1073 
copy_partitioned_fixed_cells(const size_t partition_idx,const std::string * const name,const uint64_t stride,const std::vector<ResultCellSlab> * const result_cell_slabs,const std::vector<uint64_t> * cs_offsets,const std::vector<size_t> * cs_partitions)1074 Status ReaderBase::copy_partitioned_fixed_cells(
1075     const size_t partition_idx,
1076     const std::string* const name,
1077     const uint64_t stride,
1078     const std::vector<ResultCellSlab>* const result_cell_slabs,
1079     const std::vector<uint64_t>* cs_offsets,
1080     const std::vector<size_t>* cs_partitions) {
1081   assert(name);
1082   assert(result_cell_slabs);
1083 
1084   // For easy reference.
1085   auto nullable = array_schema_->is_nullable(*name);
1086   auto it = buffers_.find(*name);
1087   auto buffer = (unsigned char*)it->second.buffer_;
1088   auto buffer_validity = (unsigned char*)it->second.validity_vector_.buffer();
1089   auto cell_size = array_schema_->cell_size(*name);
1090   ByteVecValue fill_value;
1091   uint8_t fill_value_validity = 0;
1092   if (array_schema_->is_attr(*name)) {
1093     fill_value = array_schema_->attribute(*name)->fill_value();
1094     fill_value_validity =
1095         array_schema_->attribute(*name)->fill_value_validity();
1096   }
1097   uint64_t fill_value_size = (uint64_t)fill_value.size();
1098 
1099   // Calculate the partition to operate on.
1100   const uint64_t cs_idx_start =
1101       partition_idx == 0 ? 0 : cs_partitions->at(partition_idx - 1);
1102   const uint64_t cs_idx_end = cs_partitions->at(partition_idx);
1103 
1104   // Copy the cells.
1105   for (uint64_t cs_idx = cs_idx_start; cs_idx < cs_idx_end; ++cs_idx) {
1106     // If there was an overflow and we fixed it, don't copy past the new
1107     // result cell slabs.
1108     if (cs_idx >= copy_end_.first) {
1109       break;
1110     }
1111 
1112     const auto& cs = (*result_cell_slabs)[cs_idx];
1113     uint64_t offset = cs_offsets->at(cs_idx);
1114 
1115     auto cs_length = cs.length_;
1116     if (cs_idx == copy_end_.first - 1) {
1117       cs_length = copy_end_.second;
1118     }
1119 
1120     // Copy
1121 
1122     // First we check if this is an older (pre TileDB 2.0) array with zipped
1123     // coordinates and the user has requested split buffer if so we should
1124     // proceed to copying the tile If not, and there is no tile or the tile is
1125     // empty for the field then this is a read of an older fragment in schema
1126     // evolution. In that case we want to set the field to fill values for this
1127     // for this tile.
1128     const bool split_buffer_for_zipped_coords =
1129         array_schema_->is_dim(*name) && cs.tile_->stores_zipped_coords();
1130     if ((cs.tile_ == nullptr || cs.tile_->tile_tuple(*name) == nullptr) &&
1131         !split_buffer_for_zipped_coords) {  // Empty range or attributed added
1132                                             // in schema evolution
1133       auto bytes_to_copy = cs_length * cell_size;
1134       auto fill_num = bytes_to_copy / fill_value_size;
1135       for (uint64_t j = 0; j < fill_num; ++j) {
1136         std::memcpy(buffer + offset, fill_value.data(), fill_value_size);
1137         if (nullable) {
1138           std::memset(
1139               buffer_validity +
1140                   (offset / cell_size * constants::cell_validity_size),
1141               fill_value_validity,
1142               constants::cell_validity_size);
1143         }
1144         offset += fill_value_size;
1145       }
1146     } else {  // Non-empty range
1147       if (stride == UINT64_MAX) {
1148         if (!nullable)
1149           RETURN_NOT_OK(
1150               cs.tile_->read(*name, buffer, offset, cs.start_, cs_length));
1151         else
1152           RETURN_NOT_OK(cs.tile_->read_nullable(
1153               *name, buffer, offset, cs.start_, cs_length, buffer_validity));
1154       } else {
1155         auto cell_offset = offset;
1156         auto start = cs.start_;
1157         for (uint64_t j = 0; j < cs_length; ++j) {
1158           if (!nullable)
1159             RETURN_NOT_OK(cs.tile_->read(*name, buffer, cell_offset, start, 1));
1160           else
1161             RETURN_NOT_OK(cs.tile_->read_nullable(
1162                 *name, buffer, cell_offset, start, 1, buffer_validity));
1163           cell_offset += cell_size;
1164           start += stride;
1165         }
1166       }
1167     }
1168   }
1169 
1170   return Status::Ok();
1171 }
1172 
copy_var_cells(const std::string & name,const uint64_t stride,std::vector<ResultCellSlab> * result_cell_slabs,std::vector<std::pair<size_t,size_t>> * var_cs_partitions,size_t total_cs_length)1173 Status ReaderBase::copy_var_cells(
1174     const std::string& name,
1175     const uint64_t stride,
1176     std::vector<ResultCellSlab>* result_cell_slabs,
1177     std::vector<std::pair<size_t, size_t>>* var_cs_partitions,
1178     size_t total_cs_length) {
1179   auto stat_type = (array_schema_->is_attr(name)) ? "copy_var_attr_values" :
1180                                                     "copy_var_coords";
1181   auto timer_se = stats_->start_timer(stat_type);
1182 
1183   if (result_cell_slabs->empty()) {
1184     zero_out_buffer_sizes();
1185     return Status::Ok();
1186   }
1187 
1188   std::vector<uint64_t> offset_offsets_per_cs(total_cs_length);
1189   std::vector<uint64_t> var_offsets_per_cs(total_cs_length);
1190 
1191   // Compute the destinations of offsets and var-len data in the buffers.
1192   uint64_t total_offset_size, total_var_size, total_validity_size;
1193   RETURN_NOT_OK(compute_var_cell_destinations(
1194       name,
1195       stride,
1196       result_cell_slabs,
1197       &offset_offsets_per_cs,
1198       &var_offsets_per_cs,
1199       &total_offset_size,
1200       &total_var_size,
1201       &total_validity_size));
1202 
1203   // Check for overflow and return early (without copying) in that case.
1204   if (copy_overflowed_) {
1205     return Status::Ok();
1206   }
1207 
1208   // Copy result cell slabs in parallel
1209   std::function<Status(size_t)> copy_fn = std::bind(
1210       &ReaderBase::copy_partitioned_var_cells,
1211       this,
1212       std::placeholders::_1,
1213       &name,
1214       stride,
1215       result_cell_slabs,
1216       &offset_offsets_per_cs,
1217       &var_offsets_per_cs,
1218       var_cs_partitions);
1219   auto status = parallel_for(
1220       storage_manager_->compute_tp(), 0, var_cs_partitions->size(), copy_fn);
1221 
1222   RETURN_NOT_OK(status);
1223 
1224   // Update buffer offsets
1225   *(buffers_[name].buffer_size_) = total_offset_size;
1226   *(buffers_[name].buffer_var_size_) = total_var_size;
1227   if (array_schema_->is_nullable(name))
1228     *(buffers_[name].validity_vector_.buffer_size()) = total_validity_size;
1229 
1230   return Status::Ok();
1231 }
1232 
compute_var_cs_partitions(const std::vector<ResultCellSlab> * result_cell_slabs,std::vector<std::pair<size_t,size_t>> * var_cs_partitions,size_t * total_var_cs_length)1233 void ReaderBase::compute_var_cs_partitions(
1234     const std::vector<ResultCellSlab>* result_cell_slabs,
1235     std::vector<std::pair<size_t, size_t>>* var_cs_partitions,
1236     size_t* total_var_cs_length) {
1237   if (result_cell_slabs->empty()) {
1238     return;
1239   }
1240 
1241   const int num_copy_threads =
1242       storage_manager_->compute_tp()->concurrency_level();
1243 
1244   // Calculate the partition range.
1245   const uint64_t num_cs =
1246       std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
1247   const uint64_t num_cs_partitions =
1248       std::min<uint64_t>(num_copy_threads, num_cs);
1249   const uint64_t cs_per_partition = num_cs / num_cs_partitions;
1250   const uint64_t cs_per_partition_carry = num_cs % num_cs_partitions;
1251 
1252   // Compute the boundary between each partition. Each boundary
1253   // is represented by an `std::pair` that contains the total
1254   // length of each cell slab in the leading partition and an
1255   // exclusive cell slab index that ends the partition.
1256   uint64_t next_partition_idx = cs_per_partition;
1257   if (cs_per_partition_carry > 0)
1258     ++next_partition_idx;
1259 
1260   *total_var_cs_length = 0;
1261   var_cs_partitions->reserve(num_cs_partitions);
1262   for (uint64_t cs_idx = 0; cs_idx < num_cs; cs_idx++) {
1263     if (cs_idx == next_partition_idx) {
1264       var_cs_partitions->emplace_back(*total_var_cs_length, cs_idx);
1265 
1266       // The final partition may contain extra cell slabs that did
1267       // not evenly divide into the partition range. Set the
1268       // `next_partition_idx` to zero and build the last boundary
1269       // after this for-loop.
1270       if (var_cs_partitions->size() == num_cs_partitions) {
1271         next_partition_idx = 0;
1272       } else {
1273         next_partition_idx += cs_per_partition;
1274         if (cs_idx < (cs_per_partition_carry - 1))
1275           ++next_partition_idx;
1276       }
1277     }
1278 
1279     auto cs_length = result_cell_slabs->at(cs_idx).length_;
1280     if (cs_idx == copy_end_.first - 1) {
1281       cs_length = copy_end_.second;
1282     }
1283     *total_var_cs_length += cs_length;
1284   }
1285 
1286   // Store the final boundary.
1287   var_cs_partitions->emplace_back(*total_var_cs_length, num_cs);
1288 }
1289 
compute_var_cell_destinations(const std::string & name,uint64_t stride,std::vector<ResultCellSlab> * result_cell_slabs,std::vector<uint64_t> * offset_offsets_per_cs,std::vector<uint64_t> * var_offsets_per_cs,uint64_t * total_offset_size,uint64_t * total_var_size,uint64_t * total_validity_size)1290 Status ReaderBase::compute_var_cell_destinations(
1291     const std::string& name,
1292     uint64_t stride,
1293     std::vector<ResultCellSlab>* result_cell_slabs,
1294     std::vector<uint64_t>* offset_offsets_per_cs,
1295     std::vector<uint64_t>* var_offsets_per_cs,
1296     uint64_t* total_offset_size,
1297     uint64_t* total_var_size,
1298     uint64_t* total_validity_size) {
1299   // For easy reference
1300   auto nullable = array_schema_->is_nullable(name);
1301   auto num_cs = std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
1302   auto offset_size = offsets_bytesize();
1303   ByteVecValue fill_value;
1304   if (array_schema_->is_attr(name))
1305     fill_value = array_schema_->attribute(name)->fill_value();
1306   auto fill_value_size = (uint64_t)fill_value.size();
1307 
1308   auto it = buffers_.find(name);
1309   auto buffer_size = *it->second.buffer_size_;
1310   auto buffer_var_size = *it->second.buffer_var_size_;
1311   auto buffer_validity_size = it->second.validity_vector_.buffer_size();
1312 
1313   if (offsets_extra_element_)
1314     buffer_size -= offset_size;
1315 
1316   // Compute the destinations for all result cell slabs
1317   *total_offset_size = 0;
1318   *total_var_size = 0;
1319   *total_validity_size = 0;
1320   size_t total_cs_length = 0;
1321   for (uint64_t cs_idx = 0; cs_idx < num_cs; cs_idx++) {
1322     const auto& cs = result_cell_slabs->at(cs_idx);
1323 
1324     auto cs_length = cs.length_;
1325     if (cs_idx == copy_end_.first - 1) {
1326       cs_length = copy_end_.second;
1327     }
1328 
1329     // Get tile information, if the range is nonempty.
1330     uint64_t* tile_offsets = nullptr;
1331     uint64_t tile_cell_num = 0;
1332     uint64_t tile_var_size = 0;
1333     if (cs.tile_ != nullptr && cs.tile_->tile_tuple(name) != nullptr) {
1334       const auto tile_tuple = cs.tile_->tile_tuple(name);
1335       const auto& tile = std::get<0>(*tile_tuple);
1336       const auto& tile_var = std::get<1>(*tile_tuple);
1337 
1338       // Get the internal buffer to the offset values.
1339       Buffer* const buffer = tile.buffer();
1340 
1341       tile_offsets = (uint64_t*)buffer->data();
1342       tile_cell_num = tile.cell_num();
1343       tile_var_size = tile_var.size();
1344     }
1345 
1346     // Compute the destinations for each cell in the range.
1347     uint64_t dest_vec_idx = 0;
1348     stride = (stride == UINT64_MAX) ? 1 : stride;
1349 
1350     for (auto cell_idx = cs.start_; dest_vec_idx < cs_length;
1351          cell_idx += stride, dest_vec_idx++) {
1352       // Get size of variable-sized cell
1353       uint64_t cell_var_size = 0;
1354       if (cs.tile_ == nullptr || cs.tile_->tile_tuple(name) == nullptr) {
1355         cell_var_size = fill_value_size;
1356       } else {
1357         cell_var_size =
1358             (cell_idx != tile_cell_num - 1) ?
1359                 tile_offsets[cell_idx + 1] - tile_offsets[cell_idx] :
1360                 tile_var_size - (tile_offsets[cell_idx] - tile_offsets[0]);
1361       }
1362 
1363       if (*total_offset_size + offset_size > buffer_size ||
1364           *total_var_size + cell_var_size > buffer_var_size ||
1365           (buffer_validity_size &&
1366            *total_validity_size + constants::cell_validity_size >
1367                *buffer_validity_size)) {
1368         // Try to fix the overflow by reducing the result cell slabs to copy.
1369         if (fix_var_sized_overflows_) {
1370           copy_end_ =
1371               std::pair<uint64_t, uint64_t>(cs_idx + 1, cell_idx - cs.start_);
1372 
1373           // Cannot even copy one cell, return overflow.
1374           if (cs_idx == 0 && cell_idx == result_cell_slabs->front().start_) {
1375             copy_overflowed_ = true;
1376           }
1377         } else {
1378           copy_overflowed_ = true;
1379         }
1380 
1381         // In case an extra offset is configured, we need to account memory for
1382         // it on each read
1383         *total_offset_size += offsets_extra_element_ ? offset_size : 0;
1384 
1385         return Status::Ok();
1386       }
1387 
1388       // Record destination offsets.
1389       (*offset_offsets_per_cs)[total_cs_length + dest_vec_idx] =
1390           *total_offset_size;
1391       (*var_offsets_per_cs)[total_cs_length + dest_vec_idx] = *total_var_size;
1392       *total_offset_size += offset_size;
1393       *total_var_size += cell_var_size;
1394       if (nullable)
1395         *total_validity_size += constants::cell_validity_size;
1396     }
1397 
1398     total_cs_length += cs_length;
1399   }
1400 
1401   // In case an extra offset is configured, we need to account memory for it on
1402   // each read
1403   *total_offset_size += offsets_extra_element_ ? offset_size : 0;
1404 
1405   return Status::Ok();
1406 }
1407 
copy_partitioned_var_cells(const size_t partition_idx,const std::string * const name,uint64_t stride,const std::vector<ResultCellSlab> * const result_cell_slabs,const std::vector<uint64_t> * const offset_offsets_per_cs,const std::vector<uint64_t> * const var_offsets_per_cs,const std::vector<std::pair<size_t,size_t>> * const cs_partitions)1408 Status ReaderBase::copy_partitioned_var_cells(
1409     const size_t partition_idx,
1410     const std::string* const name,
1411     uint64_t stride,
1412     const std::vector<ResultCellSlab>* const result_cell_slabs,
1413     const std::vector<uint64_t>* const offset_offsets_per_cs,
1414     const std::vector<uint64_t>* const var_offsets_per_cs,
1415     const std::vector<std::pair<size_t, size_t>>* const cs_partitions) {
1416   assert(name);
1417   assert(result_cell_slabs);
1418 
1419   auto it = buffers_.find(*name);
1420   auto nullable = array_schema_->is_nullable(*name);
1421   auto buffer = (unsigned char*)it->second.buffer_;
1422   auto buffer_var = (unsigned char*)it->second.buffer_var_;
1423   auto buffer_validity = (unsigned char*)it->second.validity_vector_.buffer();
1424   auto offset_size = offsets_bytesize();
1425   ByteVecValue fill_value;
1426   uint8_t fill_value_validity = 0;
1427   if (array_schema_->is_attr(*name)) {
1428     fill_value = array_schema_->attribute(*name)->fill_value();
1429     fill_value_validity =
1430         array_schema_->attribute(*name)->fill_value_validity();
1431   }
1432   auto fill_value_size = (uint64_t)fill_value.size();
1433   auto attr_datatype_size = datatype_size(array_schema_->type(*name));
1434 
1435   // Fetch the starting array offset into both `offset_offsets_per_cs`
1436   // and `var_offsets_per_cs`.
1437   size_t arr_offset =
1438       partition_idx == 0 ? 0 : (*cs_partitions)[partition_idx - 1].first;
1439 
1440   // Fetch the inclusive starting cell slab index and the exclusive ending
1441   // cell slab index.
1442   const size_t start_cs_idx =
1443       partition_idx == 0 ? 0 : (*cs_partitions)[partition_idx - 1].second;
1444   const size_t end_cs_idx = (*cs_partitions)[partition_idx].second;
1445 
1446   // Copy all cells within the range of cell slabs.
1447   for (uint64_t cs_idx = start_cs_idx; cs_idx < end_cs_idx; ++cs_idx) {
1448     // If there was an overflow and we fixed it, don't copy past the new
1449     // result cell slabs.
1450     if (cs_idx >= copy_end_.first) {
1451       break;
1452     }
1453     const auto& cs = (*result_cell_slabs)[cs_idx];
1454 
1455     auto cs_length = cs.length_;
1456     if (cs_idx == copy_end_.first - 1) {
1457       cs_length = copy_end_.second;
1458     }
1459 
1460     // Get tile information, if the range is nonempty.
1461     uint64_t* tile_offsets = nullptr;
1462     Tile* tile_var = nullptr;
1463     Tile* tile_validity = nullptr;
1464     uint64_t tile_cell_num = 0;
1465     if (cs.tile_ != nullptr && cs.tile_->tile_tuple(*name) != nullptr) {
1466       const auto tile_tuple = cs.tile_->tile_tuple(*name);
1467       Tile* const tile = &std::get<0>(*tile_tuple);
1468       tile_var = &std::get<1>(*tile_tuple);
1469       tile_validity = &std::get<2>(*tile_tuple);
1470 
1471       // Get the internal buffer to the offset values.
1472       Buffer* const buffer = tile->buffer();
1473 
1474       tile_offsets = (uint64_t*)buffer->data();
1475       tile_cell_num = tile->cell_num();
1476     }
1477 
1478     // Copy each cell in the range
1479     uint64_t dest_vec_idx = 0;
1480     stride = (stride == UINT64_MAX) ? 1 : stride;
1481     for (auto cell_idx = cs.start_; dest_vec_idx < cs_length;
1482          cell_idx += stride, dest_vec_idx++) {
1483       auto offset_offsets = (*offset_offsets_per_cs)[arr_offset + dest_vec_idx];
1484       auto offset_dest = buffer + offset_offsets;
1485       auto var_offset = (*var_offsets_per_cs)[arr_offset + dest_vec_idx];
1486       auto var_dest = buffer_var + var_offset;
1487       auto validity_dest = buffer_validity + (offset_offsets / offset_size);
1488 
1489       if (offsets_format_mode_ == "elements") {
1490         var_offset = var_offset / attr_datatype_size;
1491       }
1492 
1493       // Copy offset
1494       std::memcpy(offset_dest, &var_offset, offset_size);
1495 
1496       // Copy variable-sized value
1497       if (cs.tile_ == nullptr || cs.tile_->tile_tuple(*name) == nullptr) {
1498         std::memcpy(var_dest, fill_value.data(), fill_value_size);
1499         if (nullable)
1500           std::memset(
1501               validity_dest,
1502               fill_value_validity,
1503               constants::cell_validity_size);
1504       } else {
1505         const uint64_t cell_var_size =
1506             (cell_idx != tile_cell_num - 1) ?
1507                 tile_offsets[cell_idx + 1] - tile_offsets[cell_idx] :
1508                 tile_var->size() - (tile_offsets[cell_idx] - tile_offsets[0]);
1509         const uint64_t tile_var_offset =
1510             tile_offsets[cell_idx] - tile_offsets[0];
1511 
1512         RETURN_NOT_OK(tile_var->read(var_dest, cell_var_size, tile_var_offset));
1513 
1514         if (nullable)
1515           RETURN_NOT_OK(tile_validity->read(
1516               validity_dest, constants::cell_validity_size, cell_idx));
1517       }
1518     }
1519 
1520     arr_offset += cs_length;
1521   }
1522 
1523   return Status::Ok();
1524 }
1525 
process_tiles(const std::unordered_map<std::string,ProcessTileFlags> * names,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs,Subarray * subarray,const uint64_t stride,uint64_t memory_budget,uint64_t * memory_used_for_tiles)1526 Status ReaderBase::process_tiles(
1527     const std::unordered_map<std::string, ProcessTileFlags>* names,
1528     std::vector<ResultTile*>* result_tiles,
1529     std::vector<ResultCellSlab>* result_cell_slabs,
1530     Subarray* subarray,
1531     const uint64_t stride,
1532     uint64_t memory_budget,
1533     uint64_t* memory_used_for_tiles) {
1534   // If a name needs to be read, we put it on `read_names` vector (it may
1535   // contain other flags). Otherwise, we put the name on the `copy_names`
1536   // vector if it needs to be copied back to the user buffer.
1537   // We can benefit from concurrent reads by processing `read_names`
1538   // separately from `copy_names`.
1539   std::vector<std::string> read_names;
1540   std::vector<std::string> copy_names;
1541   bool is_apply_query_condition = true;
1542   read_names.reserve(names->size());
1543   for (const auto& name_pair : *names) {
1544     const std::string name = name_pair.first;
1545     const ProcessTileFlags flags = name_pair.second;
1546     if (flags & ProcessTileFlag::READ) {
1547       if (flags & ProcessTileFlag::COPY)
1548         is_apply_query_condition = false;
1549       read_names.push_back(name);
1550     } else if (flags & ProcessTileFlag::COPY) {
1551       copy_names.push_back(name);
1552       is_apply_query_condition = false;
1553     }
1554   }
1555 
1556   // Pre-load all attribute offsets into memory for attributes
1557   // to be read.
1558   RETURN_NOT_OK(load_tile_offsets(subarray, &read_names));
1559 
1560   // Respect the memory budget if it is set.
1561   if (memory_budget != std::numeric_limits<uint64_t>::max()) {
1562     // For query condition, make sure all tiles fit in memory budget.
1563     if (is_apply_query_condition) {
1564       for (const auto& name_pair : *names) {
1565         const std::string name = name_pair.first;
1566         assert(name_pair.second == ProcessTileFlag::READ);
1567 
1568         for (const auto& rt : *result_tiles) {
1569           uint64_t tile_size = 0;
1570           RETURN_NOT_OK(get_attribute_tile_size(
1571               name, rt->frag_idx(), rt->tile_idx(), &tile_size));
1572           *memory_used_for_tiles += tile_size;
1573         }
1574       }
1575 
1576       if (*memory_used_for_tiles > memory_budget) {
1577         return Status::ReaderError(
1578             "Exceeded tile memory budget applying query condition");
1579       }
1580     } else {
1581       // Make sure that for each attribute, all tiles can fit in the budget.
1582       uint64_t new_result_tile_size = result_tiles->size();
1583       for (const auto& name_pair : *names) {
1584         uint64_t mem_usage = 0;
1585         for (uint64_t i = 0;
1586              i < result_tiles->size() && i < new_result_tile_size;
1587              i++) {
1588           const auto& rt = result_tiles->at(i);
1589           uint64_t tile_size = 0;
1590           RETURN_NOT_OK(get_attribute_tile_size(
1591               name_pair.first, rt->frag_idx(), rt->tile_idx(), &tile_size));
1592           if (mem_usage + tile_size > memory_budget) {
1593             new_result_tile_size = i;
1594             break;
1595           }
1596           mem_usage += tile_size;
1597         }
1598       }
1599 
1600       if (new_result_tile_size != result_tiles->size()) {
1601         // Erase tiles from result tiles.
1602         result_tiles->erase(
1603             result_tiles->begin() + new_result_tile_size - 1,
1604             result_tiles->end());
1605 
1606         // Find the result cell slab index to end the copy opeation.
1607         auto& last_tile = result_tiles->back();
1608         uint64_t last_idx = 0;
1609         for (uint64_t i = 0; i < result_cell_slabs->size(); i++) {
1610           if (result_cell_slabs->at(i).tile_ == last_tile) {
1611             if (i == 0) {
1612               return Status::ReaderError(
1613                   "Unable to copy one tile with current budget");
1614             }
1615             last_idx = i;
1616           }
1617         }
1618 
1619         // Adjust copy_end_
1620         if (copy_end_.first > last_idx + 1) {
1621           copy_end_.first = last_idx + 1;
1622           copy_end_.second = result_cell_slabs->at(last_idx).length_;
1623         }
1624       }
1625     }
1626   }
1627 
1628   // Get the maximum number of attributes to read and unfilter in parallel.
1629   // Each attribute requires additional memory to buffer reads into
1630   // before copying them back into `buffers_`. Cells must be copied
1631   // before moving onto the next set of concurrent reads to prevent
1632   // bloating memory. Additionally, the copy cells paths are performed
1633   // in serial, which will bottleneck the read concurrency. Increasing
1634   // this number will have diminishing returns on performance.
1635   const uint64_t concurrent_reads = constants::concurrent_attr_reads;
1636 
1637   // Instantiate partitions for copying fixed and variable cells.
1638   std::vector<size_t> fixed_cs_partitions;
1639   compute_fixed_cs_partitions(result_cell_slabs, &fixed_cs_partitions);
1640 
1641   std::vector<std::pair<size_t, size_t>> var_cs_partitions;
1642   size_t total_var_cs_length;
1643   compute_var_cs_partitions(
1644       result_cell_slabs, &var_cs_partitions, &total_var_cs_length);
1645 
1646   // Handle attribute/dimensions that need to be copied but do
1647   // not need to be read.
1648   for (const auto& copy_name : copy_names) {
1649     if (!array_schema_->var_size(copy_name))
1650       RETURN_CANCEL_OR_ERROR(copy_fixed_cells(
1651           copy_name, stride, result_cell_slabs, &fixed_cs_partitions));
1652     else
1653       RETURN_CANCEL_OR_ERROR(copy_var_cells(
1654           copy_name,
1655           stride,
1656           result_cell_slabs,
1657           &var_cs_partitions,
1658           total_var_cs_length));
1659 
1660     // Copy only here should be attributes in the query condition. They should
1661     // be treated as coordinates.
1662     if (clear_coords_tiles_on_copy_)
1663       clear_tiles(copy_name, result_tiles);
1664   }
1665 
1666   // Iterate through all of the attribute names. This loop
1667   // will read, unfilter, and copy tiles back into the `buffers_`.
1668   uint64_t idx = 0;
1669   tdb_unique_ptr<ResultCellSlabsIndex> rcs_index = nullptr;
1670   while (idx < read_names.size()) {
1671     // We will perform `concurrent_reads` unless we have a smaller
1672     // number of remaining attributes to process.
1673     const uint64_t num_reads =
1674         std::min(concurrent_reads, read_names.size() - idx);
1675 
1676     // Build a vector of the attribute names to process.
1677     std::vector<std::string> inner_names(
1678         read_names.begin() + idx, read_names.begin() + idx + num_reads);
1679 
1680     // Read the tiles for the names in `inner_names`. Each attribute
1681     // name will be read concurrently.
1682     RETURN_CANCEL_OR_ERROR(read_attribute_tiles(&inner_names, result_tiles));
1683 
1684     // Copy the cells into the associated `buffers_`, and then clear the cells
1685     // from the tiles. The cell copies are not thread safe. Clearing tiles are
1686     // thread safe, but quick enough that they do not justify scheduling on
1687     // separate threads.
1688     for (const auto& inner_name : inner_names) {
1689       const ProcessTileFlags flags = names->at(inner_name);
1690 
1691       RETURN_CANCEL_OR_ERROR(unfilter_tiles(inner_name, result_tiles));
1692 
1693       if (flags & ProcessTileFlag::COPY) {
1694         if (!array_schema_->var_size(inner_name)) {
1695           RETURN_CANCEL_OR_ERROR(copy_fixed_cells(
1696               inner_name, stride, result_cell_slabs, &fixed_cs_partitions));
1697         } else {
1698           RETURN_CANCEL_OR_ERROR(copy_var_cells(
1699               inner_name,
1700               stride,
1701               result_cell_slabs,
1702               &var_cs_partitions,
1703               total_var_cs_length));
1704         }
1705         clear_tiles(inner_name, result_tiles);
1706       }
1707     }
1708 
1709     idx += inner_names.size();
1710   }
1711 
1712   return Status::Ok();
1713 }
1714 
compute_rcs_index(const std::vector<ResultCellSlab> * result_cell_slabs) const1715 tdb_unique_ptr<ReaderBase::ResultCellSlabsIndex> ReaderBase::compute_rcs_index(
1716     const std::vector<ResultCellSlab>* result_cell_slabs) const {
1717   // Build an association from the result tile to the cell slab ranges
1718   // that it contains.
1719   tdb_unique_ptr<ReaderBase::ResultCellSlabsIndex> rcs_index =
1720       tdb_unique_ptr<ReaderBase::ResultCellSlabsIndex>(
1721           tdb_new(ResultCellSlabsIndex));
1722 
1723   std::vector<ResultCellSlab>::const_iterator it = result_cell_slabs->cbegin();
1724   while (it != result_cell_slabs->cend()) {
1725     std::pair<uint64_t, uint64_t> range =
1726         std::make_pair(it->start_, it->start_ + it->length_);
1727     if (rcs_index->find(it->tile_) == rcs_index->end()) {
1728       std::vector<std::pair<uint64_t, uint64_t>> ranges(1, std::move(range));
1729       rcs_index->insert(std::make_pair(it->tile_, std::move(ranges)));
1730     } else {
1731       (*rcs_index)[it->tile_].emplace_back(std::move(range));
1732     }
1733     ++it;
1734   }
1735 
1736   return rcs_index;
1737 }
1738 
apply_query_condition(std::vector<ResultCellSlab> * const result_cell_slabs,std::vector<ResultTile * > * result_tiles,Subarray * subarray,uint64_t stride,uint64_t memory_budget_rcs,uint64_t memory_budget_tiles,uint64_t * memory_used_for_tiles)1739 Status ReaderBase::apply_query_condition(
1740     std::vector<ResultCellSlab>* const result_cell_slabs,
1741     std::vector<ResultTile*>* result_tiles,
1742     Subarray* subarray,
1743     uint64_t stride,
1744     uint64_t memory_budget_rcs,
1745     uint64_t memory_budget_tiles,
1746     uint64_t* memory_used_for_tiles) {
1747   if (condition_.empty() || result_cell_slabs->empty())
1748     return Status::Ok();
1749 
1750   // To evaluate the query condition, we need to read tiles for the
1751   // attributes used in the query condition. Build a map of attribute
1752   // names to read.
1753   const std::unordered_set<std::string>& condition_names =
1754       condition_.field_names();
1755   std::unordered_map<std::string, ProcessTileFlags> names;
1756   for (const auto& condition_name : condition_names) {
1757     names[condition_name] = ProcessTileFlag::READ;
1758   }
1759 
1760   // Each element in `names` has been flagged with `ProcessTileFlag::READ`.
1761   // This will read the tiles, but will not copy them into the user buffers.
1762   RETURN_NOT_OK(process_tiles(
1763       &names,
1764       result_tiles,
1765       result_cell_slabs,
1766       subarray,
1767       stride,
1768       memory_budget_tiles,
1769       memory_used_for_tiles));
1770 
1771   // The `UINT64_MAX` is a sentinel value to indicate that we do not
1772   // use a stride in the cell index calculation. To simplify our logic,
1773   // assign this to `1`.
1774   if (stride == UINT64_MAX)
1775     stride = 1;
1776 
1777   RETURN_NOT_OK(condition_.apply(
1778       array_schema_, result_cell_slabs, stride, memory_budget_rcs));
1779 
1780   logger_->debug("Done applying query condition");
1781 
1782   return Status::Ok();
1783 }
1784 
get_attribute_tile_size(const std::string & name,unsigned f,uint64_t t,uint64_t * tile_size)1785 Status ReaderBase::get_attribute_tile_size(
1786     const std::string& name, unsigned f, uint64_t t, uint64_t* tile_size) {
1787   *tile_size = 0;
1788   *tile_size += fragment_metadata_[f]->tile_size(name, t);
1789 
1790   if (array_schema_->var_size(name)) {
1791     uint64_t temp = 0;
1792     RETURN_NOT_OK(fragment_metadata_[f]->tile_var_size(name, t, &temp));
1793     *tile_size += temp;
1794   }
1795 
1796   if (array_schema_->is_nullable(name)) {
1797     *tile_size +=
1798         fragment_metadata_[f]->cell_num(t) * constants::cell_validity_size;
1799   }
1800 
1801   return Status::Ok();
1802 }
1803 
1804 template <class T>
compute_result_space_tiles(const Subarray * subarray,const Subarray * partitioner_subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles) const1805 void ReaderBase::compute_result_space_tiles(
1806     const Subarray* subarray,
1807     const Subarray* partitioner_subarray,
1808     std::map<const T*, ResultSpaceTile<T>>* result_space_tiles) const {
1809   // For easy reference
1810   auto domain = array_schema_->domain()->domain();
1811   auto tile_extents = array_schema_->domain()->tile_extents();
1812   auto tile_order = array_schema_->tile_order();
1813 
1814   // Compute fragment tile domains
1815   std::vector<TileDomain<T>> frag_tile_domains;
1816 
1817   if (partitioner_subarray->is_set()) {
1818     auto relevant_frags = partitioner_subarray->relevant_fragments();
1819     for (auto it = relevant_frags->rbegin(); it != relevant_frags->rend();
1820          it++) {
1821       if (fragment_metadata_[*it]->dense()) {
1822         frag_tile_domains.emplace_back(
1823             *it,
1824             domain,
1825             fragment_metadata_[*it]->non_empty_domain(),
1826             tile_extents,
1827             tile_order);
1828       }
1829     }
1830   } else {
1831     auto fragment_num = (int)fragment_metadata_.size();
1832     if (fragment_num > 0) {
1833       for (int i = fragment_num - 1; i >= 0; --i) {
1834         if (fragment_metadata_[i]->dense()) {
1835           frag_tile_domains.emplace_back(
1836               i,
1837               domain,
1838               fragment_metadata_[i]->non_empty_domain(),
1839               tile_extents,
1840               tile_order);
1841         }
1842       }
1843     }
1844   }
1845 
1846   // Get tile coords and array domain
1847   const auto& tile_coords = subarray->tile_coords();
1848   TileDomain<T> array_tile_domain(
1849       UINT32_MAX, domain, domain, tile_extents, tile_order);
1850 
1851   // Compute result space tiles
1852   compute_result_space_tiles<T>(
1853       array_schema_->domain(),
1854       tile_coords,
1855       array_tile_domain,
1856       frag_tile_domains,
1857       result_space_tiles);
1858 }
1859 
has_coords() const1860 bool ReaderBase::has_coords() const {
1861   for (const auto& it : buffers_) {
1862     if (it.first == constants::coords || array_schema_->is_dim(it.first))
1863       return true;
1864   }
1865 
1866   return false;
1867 }
1868 
1869 template <class T>
fill_dense_coords(const Subarray & subarray)1870 Status ReaderBase::fill_dense_coords(const Subarray& subarray) {
1871   auto timer_se = stats_->start_timer("fill_dense_coords");
1872 
1873   // Reading coordinates with a query condition is currently unsupported.
1874   // Query conditions mutate the result cell slabs to filter attributes.
1875   // This path does not use result cell slabs, which will fill coordinates
1876   // for cells that should be filtered out.
1877   if (!condition_.empty()) {
1878     return logger_->status(
1879         Status::ReaderError("Cannot read dense coordinates; dense coordinate "
1880                             "reads are unsupported with a query condition"));
1881   }
1882 
1883   // Prepare buffers
1884   std::vector<unsigned> dim_idx;
1885   std::vector<QueryBuffer*> buffers;
1886   auto coords_it = buffers_.find(constants::coords);
1887   auto dim_num = array_schema_->dim_num();
1888   if (coords_it != buffers_.end()) {
1889     buffers.emplace_back(&(coords_it->second));
1890     dim_idx.emplace_back(dim_num);
1891   } else {
1892     for (unsigned d = 0; d < dim_num; ++d) {
1893       const auto& dim = array_schema_->dimension(d);
1894       auto it = buffers_.find(dim->name());
1895       if (it != buffers_.end()) {
1896         buffers.emplace_back(&(it->second));
1897         dim_idx.emplace_back(d);
1898       }
1899     }
1900   }
1901   std::vector<uint64_t> offsets(buffers.size(), 0);
1902 
1903   if (layout_ == Layout::GLOBAL_ORDER) {
1904     RETURN_NOT_OK(
1905         fill_dense_coords_global<T>(subarray, dim_idx, buffers, &offsets));
1906   } else {
1907     assert(layout_ == Layout::ROW_MAJOR || layout_ == Layout::COL_MAJOR);
1908     RETURN_NOT_OK(
1909         fill_dense_coords_row_col<T>(subarray, dim_idx, buffers, &offsets));
1910   }
1911 
1912   // Update buffer sizes
1913   for (size_t i = 0; i < buffers.size(); ++i)
1914     *(buffers[i]->buffer_size_) = offsets[i];
1915 
1916   return Status::Ok();
1917 }
1918 
1919 template <class T>
fill_dense_coords_global(const Subarray & subarray,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets)1920 Status ReaderBase::fill_dense_coords_global(
1921     const Subarray& subarray,
1922     const std::vector<unsigned>& dim_idx,
1923     const std::vector<QueryBuffer*>& buffers,
1924     std::vector<uint64_t>* offsets) {
1925   auto tile_coords = subarray.tile_coords();
1926   auto cell_order = array_schema_->cell_order();
1927 
1928   for (const auto& tc : tile_coords) {
1929     auto tile_subarray = subarray.crop_to_tile((const T*)&tc[0], cell_order);
1930     RETURN_NOT_OK(
1931         fill_dense_coords_row_col<T>(tile_subarray, dim_idx, buffers, offsets));
1932   }
1933 
1934   return Status::Ok();
1935 }
1936 
1937 template <class T>
fill_dense_coords_row_col(const Subarray & subarray,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets)1938 Status ReaderBase::fill_dense_coords_row_col(
1939     const Subarray& subarray,
1940     const std::vector<unsigned>& dim_idx,
1941     const std::vector<QueryBuffer*>& buffers,
1942     std::vector<uint64_t>* offsets) {
1943   auto cell_order = array_schema_->cell_order();
1944   auto dim_num = array_schema_->dim_num();
1945 
1946   // Iterate over all coordinates, retrieved in cell slabs
1947   CellSlabIter<T> iter(&subarray);
1948   RETURN_CANCEL_OR_ERROR(iter.begin());
1949   while (!iter.end()) {
1950     auto cell_slab = iter.cell_slab();
1951     auto coords_num = cell_slab.length_;
1952 
1953     // Check for overflow
1954     for (size_t i = 0; i < buffers.size(); ++i) {
1955       auto idx = (dim_idx[i] == dim_num) ? 0 : dim_idx[i];
1956       auto dim = array_schema_->domain()->dimension(idx);
1957       auto coord_size = dim->coord_size();
1958       coord_size = (dim_idx[i] == dim_num) ? coord_size * dim_num : coord_size;
1959       auto buff_size = *(buffers[i]->buffer_size_);
1960       auto offset = (*offsets)[i];
1961       if (coords_num * coord_size + offset > buff_size) {
1962         copy_overflowed_ = true;
1963         return Status::Ok();
1964       }
1965     }
1966 
1967     // Copy slab
1968     if (layout_ == Layout::ROW_MAJOR ||
1969         (layout_ == Layout::GLOBAL_ORDER && cell_order == Layout::ROW_MAJOR))
1970       fill_dense_coords_row_slab(
1971           &cell_slab.coords_[0], coords_num, dim_idx, buffers, offsets);
1972     else
1973       fill_dense_coords_col_slab(
1974           &cell_slab.coords_[0], coords_num, dim_idx, buffers, offsets);
1975 
1976     ++iter;
1977   }
1978 
1979   return Status::Ok();
1980 }
1981 
1982 template <class T>
fill_dense_coords_row_slab(const T * start,uint64_t num,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets) const1983 void ReaderBase::fill_dense_coords_row_slab(
1984     const T* start,
1985     uint64_t num,
1986     const std::vector<unsigned>& dim_idx,
1987     const std::vector<QueryBuffer*>& buffers,
1988     std::vector<uint64_t>* offsets) const {
1989   // For easy reference
1990   auto dim_num = array_schema_->dim_num();
1991 
1992   // Special zipped coordinates
1993   if (dim_idx.size() == 1 && dim_idx[0] == dim_num) {
1994     auto c_buff = (char*)buffers[0]->buffer_;
1995     auto offset = &(*offsets)[0];
1996 
1997     // Fill coordinates
1998     for (uint64_t i = 0; i < num; ++i) {
1999       // First dim-1 dimensions are copied as they are
2000       if (dim_num > 1) {
2001         auto bytes_to_copy = (dim_num - 1) * sizeof(T);
2002         std::memcpy(c_buff + *offset, start, bytes_to_copy);
2003         *offset += bytes_to_copy;
2004       }
2005 
2006       // Last dimension is incremented by `i`
2007       auto new_coord = start[dim_num - 1] + i;
2008       std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2009       *offset += sizeof(T);
2010     }
2011   } else {  // Set of separate coordinate buffers
2012     for (uint64_t i = 0; i < num; ++i) {
2013       for (size_t b = 0; b < buffers.size(); ++b) {
2014         auto c_buff = (char*)buffers[b]->buffer_;
2015         auto offset = &(*offsets)[b];
2016 
2017         // First dim-1 dimensions are copied as they are
2018         if (dim_num > 1 && dim_idx[b] < dim_num - 1) {
2019           std::memcpy(c_buff + *offset, &start[dim_idx[b]], sizeof(T));
2020           *offset += sizeof(T);
2021         } else {
2022           // Last dimension is incremented by `i`
2023           auto new_coord = start[dim_num - 1] + i;
2024           std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2025           *offset += sizeof(T);
2026         }
2027       }
2028     }
2029   }
2030 }
2031 
2032 template <class T>
fill_dense_coords_col_slab(const T * start,uint64_t num,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets) const2033 void ReaderBase::fill_dense_coords_col_slab(
2034     const T* start,
2035     uint64_t num,
2036     const std::vector<unsigned>& dim_idx,
2037     const std::vector<QueryBuffer*>& buffers,
2038     std::vector<uint64_t>* offsets) const {
2039   // For easy reference
2040   auto dim_num = array_schema_->dim_num();
2041 
2042   // Special zipped coordinates
2043   if (dim_idx.size() == 1 && dim_idx[0] == dim_num) {
2044     auto c_buff = (char*)buffers[0]->buffer_;
2045     auto offset = &(*offsets)[0];
2046 
2047     // Fill coordinates
2048     for (uint64_t i = 0; i < num; ++i) {
2049       // First dimension is incremented by `i`
2050       auto new_coord = start[0] + i;
2051       std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2052       *offset += sizeof(T);
2053 
2054       // Last dim-1 dimensions are copied as they are
2055       if (dim_num > 1) {
2056         auto bytes_to_copy = (dim_num - 1) * sizeof(T);
2057         std::memcpy(c_buff + *offset, &start[1], bytes_to_copy);
2058         *offset += bytes_to_copy;
2059       }
2060     }
2061   } else {  // Separate coordinate buffers
2062     for (uint64_t i = 0; i < num; ++i) {
2063       for (size_t b = 0; b < buffers.size(); ++b) {
2064         auto c_buff = (char*)buffers[b]->buffer_;
2065         auto offset = &(*offsets)[b];
2066 
2067         // First dimension is incremented by `i`
2068         if (dim_idx[b] == 0) {
2069           auto new_coord = start[0] + i;
2070           std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2071           *offset += sizeof(T);
2072         } else {  // Last dim-1 dimensions are copied as they are
2073           std::memcpy(c_buff + *offset, &start[dim_idx[b]], sizeof(T));
2074           *offset += sizeof(T);
2075         }
2076       }
2077     }
2078   }
2079 }
2080 
2081 // Explicit template instantiations
2082 template void ReaderBase::compute_result_space_tiles<int8_t>(
2083     const Subarray* subarray,
2084     const Subarray* partitioner_subarray,
2085     std::map<const int8_t*, ResultSpaceTile<int8_t>>* result_space_tiles) const;
2086 template void ReaderBase::compute_result_space_tiles<uint8_t>(
2087     const Subarray* subarray,
2088     const Subarray* partitioner_subarray,
2089     std::map<const uint8_t*, ResultSpaceTile<uint8_t>>* result_space_tiles)
2090     const;
2091 template void ReaderBase::compute_result_space_tiles<int16_t>(
2092     const Subarray* subarray,
2093     const Subarray* partitioner_subarray,
2094     std::map<const int16_t*, ResultSpaceTile<int16_t>>* result_space_tiles)
2095     const;
2096 template void ReaderBase::compute_result_space_tiles<uint16_t>(
2097     const Subarray* subarray,
2098     const Subarray* partitioner_subarray,
2099     std::map<const uint16_t*, ResultSpaceTile<uint16_t>>* result_space_tiles)
2100     const;
2101 template void ReaderBase::compute_result_space_tiles<int32_t>(
2102     const Subarray* subarray,
2103     const Subarray* partitioner_subarray,
2104     std::map<const int32_t*, ResultSpaceTile<int32_t>>* result_space_tiles)
2105     const;
2106 template void ReaderBase::compute_result_space_tiles<uint32_t>(
2107     const Subarray* subarray,
2108     const Subarray* partitioner_subarray,
2109     std::map<const uint32_t*, ResultSpaceTile<uint32_t>>* result_space_tiles)
2110     const;
2111 template void ReaderBase::compute_result_space_tiles<int64_t>(
2112     const Subarray* subarray,
2113     const Subarray* partitioner_subarray,
2114     std::map<const int64_t*, ResultSpaceTile<int64_t>>* result_space_tiles)
2115     const;
2116 template void ReaderBase::compute_result_space_tiles<uint64_t>(
2117     const Subarray* subarray,
2118     const Subarray* partitioner_subarray,
2119     std::map<const uint64_t*, ResultSpaceTile<uint64_t>>* result_space_tiles)
2120     const;
2121 
2122 template Status ReaderBase::fill_dense_coords<int8_t>(const Subarray&);
2123 template Status ReaderBase::fill_dense_coords<uint8_t>(const Subarray&);
2124 template Status ReaderBase::fill_dense_coords<int16_t>(const Subarray&);
2125 template Status ReaderBase::fill_dense_coords<uint16_t>(const Subarray&);
2126 template Status ReaderBase::fill_dense_coords<int32_t>(const Subarray&);
2127 template Status ReaderBase::fill_dense_coords<uint32_t>(const Subarray&);
2128 template Status ReaderBase::fill_dense_coords<int64_t>(const Subarray&);
2129 template Status ReaderBase::fill_dense_coords<uint64_t>(const Subarray&);
2130 
2131 }  // namespace sm
2132 }  // namespace tiledb
2133