1 /**
2 * @file reader_base.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file implements class ReaderBase.
31 */
32
33 #include "tiledb/sm/query/reader_base.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array/array.h"
36 #include "tiledb/sm/array_schema/array_schema.h"
37 #include "tiledb/sm/filesystem/vfs.h"
38 #include "tiledb/sm/fragment/fragment_metadata.h"
39 #include "tiledb/sm/misc/parallel_functions.h"
40 #include "tiledb/sm/query/query_macros.h"
41 #include "tiledb/sm/query/strategy_base.h"
42 #include "tiledb/sm/subarray/cell_slab_iter.h"
43 #include "tiledb/sm/subarray/subarray.h"
44
45 namespace tiledb {
46 namespace sm {
47
48 /* ****************************** */
49 /* CONSTRUCTORS */
50 /* ****************************** */
51
ReaderBase(stats::Stats * stats,tdb_shared_ptr<Logger> logger,StorageManager * storage_manager,Array * array,Config & config,std::unordered_map<std::string,QueryBuffer> & buffers,Subarray & subarray,Layout layout,QueryCondition & condition)52 ReaderBase::ReaderBase(
53 stats::Stats* stats,
54 tdb_shared_ptr<Logger> logger,
55 StorageManager* storage_manager,
56 Array* array,
57 Config& config,
58 std::unordered_map<std::string, QueryBuffer>& buffers,
59 Subarray& subarray,
60 Layout layout,
61 QueryCondition& condition)
62 : StrategyBase(
63 stats,
64 logger,
65 storage_manager,
66 array,
67 config,
68 buffers,
69 subarray,
70 layout)
71 , condition_(condition)
72 , fix_var_sized_overflows_(false)
73 , clear_coords_tiles_on_copy_(true)
74 , copy_overflowed_(false) {
75 if (array != nullptr)
76 fragment_metadata_ = array->fragment_metadata();
77
78 auto uint64_t_max = std::numeric_limits<uint64_t>::max();
79 copy_end_ = std::pair<uint64_t, uint64_t>(uint64_t_max, uint64_t_max);
80 }
81
82 /* ********************************* */
83 /* STATIC FUNCTIONS */
84 /* ********************************* */
85
86 template <class T>
compute_result_space_tiles(const Domain * domain,const std::vector<std::vector<uint8_t>> & tile_coords,const TileDomain<T> & array_tile_domain,const std::vector<TileDomain<T>> & frag_tile_domains,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles)87 void ReaderBase::compute_result_space_tiles(
88 const Domain* domain,
89 const std::vector<std::vector<uint8_t>>& tile_coords,
90 const TileDomain<T>& array_tile_domain,
91 const std::vector<TileDomain<T>>& frag_tile_domains,
92 std::map<const T*, ResultSpaceTile<T>>* result_space_tiles) {
93 auto fragment_num = (unsigned)frag_tile_domains.size();
94 auto dim_num = array_tile_domain.dim_num();
95 std::vector<T> start_coords;
96 const T* coords;
97 start_coords.resize(dim_num);
98
99 // For all tile coordinates
100 for (const auto& tc : tile_coords) {
101 coords = (T*)(&(tc[0]));
102 start_coords = array_tile_domain.start_coords(coords);
103
104 // Create result space tile and insert into the map
105 auto r = result_space_tiles->emplace(coords, ResultSpaceTile<T>());
106 auto& result_space_tile = r.first->second;
107 result_space_tile.set_start_coords(start_coords);
108
109 // Add fragment info to the result space tile
110 for (unsigned f = 0; f < fragment_num; ++f) {
111 // Check if the fragment overlaps with the space tile
112 if (!frag_tile_domains[f].in_tile_domain(coords))
113 continue;
114
115 // Check if any previous fragment covers this fragment
116 // for the tile identified by `coords`
117 bool covered = false;
118 for (unsigned j = 0; j < f; ++j) {
119 if (frag_tile_domains[j].covers(coords, frag_tile_domains[f])) {
120 covered = true;
121 break;
122 }
123 }
124
125 // Exclude this fragment from the space tile
126 if (covered)
127 continue;
128
129 // Include this fragment in the space tile
130 auto frag_domain = frag_tile_domains[f].domain_slice();
131 auto frag_idx = frag_tile_domains[f].id();
132 result_space_tile.append_frag_domain(frag_idx, frag_domain);
133 auto tile_idx = frag_tile_domains[f].tile_pos(coords);
134 ResultTile result_tile(frag_idx, tile_idx, domain);
135 result_space_tile.set_result_tile(frag_idx, result_tile);
136 }
137 }
138 }
139
140 /* ****************************** */
141 /* PROTECTED METHODS */
142 /* ****************************** */
143
clear_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles) const144 void ReaderBase::clear_tiles(
145 const std::string& name,
146 const std::vector<ResultTile*>* result_tiles) const {
147 for (auto& result_tile : *result_tiles)
148 result_tile->erase_tile(name);
149 }
150
reset_buffer_sizes()151 void ReaderBase::reset_buffer_sizes() {
152 for (auto& it : buffers_) {
153 *(it.second.buffer_size_) = it.second.original_buffer_size_;
154 if (it.second.buffer_var_size_ != nullptr)
155 *(it.second.buffer_var_size_) = it.second.original_buffer_var_size_;
156 if (it.second.validity_vector_.buffer_size() != nullptr)
157 *(it.second.validity_vector_.buffer_size()) =
158 it.second.original_validity_vector_size_;
159 }
160 }
161
zero_out_buffer_sizes()162 void ReaderBase::zero_out_buffer_sizes() {
163 for (auto& buffer : buffers_) {
164 if (buffer.second.buffer_size_ != nullptr)
165 *(buffer.second.buffer_size_) = 0;
166 if (buffer.second.buffer_var_size_ != nullptr)
167 *(buffer.second.buffer_var_size_) = 0;
168 if (buffer.second.validity_vector_.buffer_size() != nullptr)
169 *(buffer.second.validity_vector_.buffer_size()) = 0;
170 }
171 }
172
check_subarray() const173 Status ReaderBase::check_subarray() const {
174 if (subarray_.layout() == Layout::GLOBAL_ORDER && subarray_.range_num() != 1)
175 return logger_->status(Status::ReaderError(
176 "Cannot initialize reader; Multi-range subarrays with "
177 "global order layout are not supported"));
178
179 return Status::Ok();
180 }
181
check_validity_buffer_sizes() const182 Status ReaderBase::check_validity_buffer_sizes() const {
183 // Verify that the validity buffer size for each
184 // nullable attribute is large enough to contain
185 // a validity value for each cell.
186 for (const auto& it : buffers_) {
187 const std::string& name = it.first;
188 if (array_schema_->is_nullable(name)) {
189 const uint64_t buffer_size = *it.second.buffer_size_;
190
191 uint64_t min_cell_num = 0;
192 if (array_schema_->var_size(name)) {
193 min_cell_num = buffer_size / constants::cell_var_offset_size;
194
195 // If the offsets buffer contains an extra element to mark
196 // the offset to the end of the data buffer, we do not need
197 // a validity value for that extra offset.
198 if (offsets_extra_element_)
199 min_cell_num = std::min<uint64_t>(0, min_cell_num - 1);
200 } else {
201 min_cell_num = buffer_size / array_schema_->cell_size(name);
202 }
203
204 const uint64_t buffer_validity_size =
205 *it.second.validity_vector_.buffer_size();
206 const uint64_t cell_validity_num =
207 buffer_validity_size / constants::cell_validity_size;
208
209 if (cell_validity_num < min_cell_num) {
210 std::stringstream ss;
211 ss << "Buffer sizes check failed; Invalid number of validity cells "
212 "given for ";
213 ss << "attribute '" << name << "'";
214 ss << " (" << cell_validity_num << " < " << min_cell_num << ")";
215 return logger_->status(Status::ReaderError(ss.str()));
216 }
217 }
218 }
219
220 return Status::Ok();
221 }
222
load_tile_offsets(Subarray * subarray,const std::vector<std::string> * names)223 Status ReaderBase::load_tile_offsets(
224 Subarray* subarray, const std::vector<std::string>* names) {
225 auto timer_se = stats_->start_timer("load_tile_offsets");
226 const auto encryption_key = array_->encryption_key();
227
228 // Fetch relevant fragments so we load tile offsets only from intersecting
229 // fragments
230 const auto relevant_fragments = subarray->relevant_fragments();
231
232 bool all_frag = !subarray->is_set();
233
234 const auto status = parallel_for(
235 storage_manager_->compute_tp(),
236 0,
237 all_frag ? fragment_metadata_.size() : relevant_fragments->size(),
238 [&](const uint64_t i) {
239 auto frag_idx = all_frag ? i : relevant_fragments->at(i);
240 auto& fragment = fragment_metadata_[frag_idx];
241 const auto format_version = fragment->format_version();
242
243 // Filter the 'names' for format-specific names.
244 std::vector<std::string> filtered_names;
245 filtered_names.reserve(names->size());
246 auto schema = fragment->array_schema();
247 for (const auto& name : *names) {
248 // Applicable for zipped coordinates only to versions < 5
249 if (name == constants::coords && format_version >= 5)
250 continue;
251
252 // Applicable to separate coordinates only to versions >= 5
253 const auto is_dim = schema->is_dim(name);
254 if (is_dim && format_version < 5)
255 continue;
256
257 // Not a member of array schema, this field was added in array schema
258 // evolution, ignore for this fragment's tile offsets
259 if (!schema->is_field(name))
260 continue;
261
262 filtered_names.emplace_back(name);
263 }
264
265 RETURN_NOT_OK(fragment->load_tile_offsets(
266 *encryption_key, std::move(filtered_names)));
267 return Status::Ok();
268 });
269
270 RETURN_NOT_OK(status);
271
272 return Status::Ok();
273 }
274
load_tile_var_sizes(Subarray * subarray,const std::vector<std::string> * names)275 Status ReaderBase::load_tile_var_sizes(
276 Subarray* subarray, const std::vector<std::string>* names) {
277 auto timer_se = stats_->start_timer("load_tile_var_sizes");
278 const auto encryption_key = array_->encryption_key();
279
280 // Fetch relevant fragments so we load tile var sizes only from intersecting
281 // fragments
282 const auto relevant_fragments = subarray->relevant_fragments();
283
284 bool all_frag = !subarray->is_set();
285
286 const auto status = parallel_for(
287 storage_manager_->compute_tp(),
288 0,
289 all_frag ? fragment_metadata_.size() : relevant_fragments->size(),
290 [&](const uint64_t i) {
291 auto frag_idx = all_frag ? i : relevant_fragments->at(i);
292 auto& fragment = fragment_metadata_[frag_idx];
293
294 auto schema = fragment->array_schema();
295 for (const auto& name : *names) {
296 // Not a var size attribute.
297 if (!schema->var_size(name))
298 continue;
299
300 // Not a member of array schema, this field was added in array schema
301 // evolution, ignore for this fragment's tile var sizes.
302 if (!schema->is_field(name))
303 continue;
304
305 fragment->load_tile_var_sizes(*encryption_key, name);
306 }
307
308 return Status::Ok();
309 });
310
311 RETURN_NOT_OK(status);
312
313 return Status::Ok();
314 }
315
init_tile(uint32_t format_version,const std::string & name,Tile * tile) const316 Status ReaderBase::init_tile(
317 uint32_t format_version, const std::string& name, Tile* tile) const {
318 // For easy reference
319 auto cell_size = array_schema_->cell_size(name);
320 auto type = array_schema_->type(name);
321 auto is_coords = (name == constants::coords);
322 auto dim_num = (is_coords) ? array_schema_->dim_num() : 0;
323
324 // Initialize
325 RETURN_NOT_OK(tile->init_filtered(format_version, type, cell_size, dim_num));
326
327 return Status::Ok();
328 }
329
init_tile(uint32_t format_version,const std::string & name,Tile * tile,Tile * tile_var) const330 Status ReaderBase::init_tile(
331 uint32_t format_version,
332 const std::string& name,
333 Tile* tile,
334 Tile* tile_var) const {
335 // For easy reference
336 auto type = array_schema_->type(name);
337
338 // Initialize
339 RETURN_NOT_OK(tile->init_filtered(
340 format_version,
341 constants::cell_var_offset_type,
342 constants::cell_var_offset_size,
343 0));
344 RETURN_NOT_OK(
345 tile_var->init_filtered(format_version, type, datatype_size(type), 0));
346 return Status::Ok();
347 }
348
init_tile_nullable(uint32_t format_version,const std::string & name,Tile * tile,Tile * tile_validity) const349 Status ReaderBase::init_tile_nullable(
350 uint32_t format_version,
351 const std::string& name,
352 Tile* tile,
353 Tile* tile_validity) const {
354 // For easy reference
355 auto cell_size = array_schema_->cell_size(name);
356 auto type = array_schema_->type(name);
357 auto is_coords = (name == constants::coords);
358 auto dim_num = (is_coords) ? array_schema_->dim_num() : 0;
359
360 // Initialize
361 RETURN_NOT_OK(tile->init_filtered(format_version, type, cell_size, dim_num));
362 RETURN_NOT_OK(tile_validity->init_filtered(
363 format_version,
364 constants::cell_validity_type,
365 constants::cell_validity_size,
366 0));
367
368 return Status::Ok();
369 }
370
init_tile_nullable(uint32_t format_version,const std::string & name,Tile * tile,Tile * tile_var,Tile * tile_validity) const371 Status ReaderBase::init_tile_nullable(
372 uint32_t format_version,
373 const std::string& name,
374 Tile* tile,
375 Tile* tile_var,
376 Tile* tile_validity) const {
377 // For easy reference
378 auto type = array_schema_->type(name);
379
380 // Initialize
381 RETURN_NOT_OK(tile->init_filtered(
382 format_version,
383 constants::cell_var_offset_type,
384 constants::cell_var_offset_size,
385 0));
386 RETURN_NOT_OK(
387 tile_var->init_filtered(format_version, type, datatype_size(type), 0));
388 RETURN_NOT_OK(tile_validity->init_filtered(
389 format_version,
390 constants::cell_validity_type,
391 constants::cell_validity_size,
392 0));
393 return Status::Ok();
394 }
395
read_attribute_tiles(const std::vector<std::string> * names,const std::vector<ResultTile * > * result_tiles) const396 Status ReaderBase::read_attribute_tiles(
397 const std::vector<std::string>* names,
398 const std::vector<ResultTile*>* result_tiles) const {
399 auto timer_se = stats_->start_timer("attr_tiles");
400 return read_tiles(names, result_tiles);
401 }
402
read_coordinate_tiles(const std::vector<std::string> * names,const std::vector<ResultTile * > * result_tiles) const403 Status ReaderBase::read_coordinate_tiles(
404 const std::vector<std::string>* names,
405 const std::vector<ResultTile*>* result_tiles) const {
406 auto timer_se = stats_->start_timer("coord_tiles");
407 return read_tiles(names, result_tiles);
408 }
409
read_tiles(const std::vector<std::string> * names,const std::vector<ResultTile * > * result_tiles) const410 Status ReaderBase::read_tiles(
411 const std::vector<std::string>* names,
412 const std::vector<ResultTile*>* result_tiles) const {
413 // Shortcut for empty tile vec
414 if (result_tiles->empty())
415 return Status::Ok();
416
417 // Reading tiles are thread safe. However, we will perform
418 // them on this thread if there is only one read to perform.
419 if (names->size() == 1) {
420 RETURN_NOT_OK(read_tiles(names->at(0), result_tiles));
421 } else {
422 const auto status = parallel_for(
423 storage_manager_->compute_tp(),
424 0,
425 names->size(),
426 [&](const uint64_t i) {
427 RETURN_NOT_OK(read_tiles(names->at(i), result_tiles));
428 return Status::Ok();
429 });
430
431 RETURN_NOT_OK(status);
432 }
433
434 return Status::Ok();
435 }
436
read_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles) const437 Status ReaderBase::read_tiles(
438 const std::string& name,
439 const std::vector<ResultTile*>* result_tiles) const {
440 // Shortcut for empty tile vec
441 if (result_tiles->empty())
442 return Status::Ok();
443
444 // Read the tiles asynchronously
445 std::vector<ThreadPool::Task> tasks;
446 RETURN_CANCEL_OR_ERROR(read_tiles(name, result_tiles, &tasks));
447
448 // Wait for the reads to finish and check statuses.
449 auto statuses = storage_manager_->io_tp()->wait_all_status(tasks);
450 for (const auto& st : statuses)
451 RETURN_CANCEL_OR_ERROR(st);
452
453 return Status::Ok();
454 }
455
read_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles,std::vector<ThreadPool::Task> * const tasks) const456 Status ReaderBase::read_tiles(
457 const std::string& name,
458 const std::vector<ResultTile*>* result_tiles,
459 std::vector<ThreadPool::Task>* const tasks) const {
460 // Shortcut for empty tile vec
461 if (result_tiles->empty())
462 return Status::Ok();
463
464 // For each tile, read from its fragment.
465 const bool var_size = array_schema_->var_size(name);
466 const bool nullable = array_schema_->is_nullable(name);
467
468 // Gather the unique fragments indexes for which there are tiles
469 std::unordered_set<uint32_t> fragment_idxs_set;
470 for (const auto& tile : *result_tiles)
471 fragment_idxs_set.emplace(tile->frag_idx());
472
473 // Put fragment indexes in a vector
474 std::vector<uint32_t> fragment_idxs_vec;
475 fragment_idxs_vec.reserve(fragment_idxs_set.size());
476 for (const auto& idx : fragment_idxs_set)
477 fragment_idxs_vec.emplace_back(idx);
478
479 // Protect all elements within `result_tiles`.
480 std::unique_lock<std::mutex> ul(result_tiles_mutex_);
481
482 // Populate the list of regions per file to be read.
483 std::map<URI, std::vector<std::tuple<uint64_t, void*, uint64_t>>> all_regions;
484 for (const auto& tile : *result_tiles) {
485 auto const fragment = fragment_metadata_[tile->frag_idx()];
486 const uint32_t format_version = fragment->format_version();
487
488 // Applicable for zipped coordinates only to versions < 5
489 if (name == constants::coords && format_version >= 5)
490 continue;
491
492 // Applicable to separate coordinates only to versions >= 5
493 const bool is_dim = array_schema_->is_dim(name);
494 if (is_dim && format_version < 5)
495 continue;
496
497 // If the fragment doesn't have the attribute, this is a schema evolution
498 // field and will be treated with fill-in value instead of reading from disk
499 if (!fragment->array_schema()->is_field(name))
500 continue;
501
502 // Initialize the tile(s)
503 if (is_dim) {
504 const uint64_t dim_num = array_schema_->dim_num();
505 for (uint64_t d = 0; d < dim_num; ++d) {
506 if (array_schema_->dimension(d)->name() == name) {
507 tile->init_coord_tile(name, d);
508 break;
509 }
510 }
511 } else {
512 tile->init_attr_tile(name);
513 }
514
515 ResultTile::TileTuple* const tile_tuple = tile->tile_tuple(name);
516 assert(tile_tuple != nullptr);
517 Tile* const t = &std::get<0>(*tile_tuple);
518 Tile* const t_var = &std::get<1>(*tile_tuple);
519 Tile* const t_validity = &std::get<2>(*tile_tuple);
520 if (!var_size) {
521 if (nullable)
522 RETURN_NOT_OK(init_tile_nullable(format_version, name, t, t_validity));
523 else
524 RETURN_NOT_OK(init_tile(format_version, name, t));
525 } else {
526 if (nullable)
527 RETURN_NOT_OK(
528 init_tile_nullable(format_version, name, t, t_var, t_validity));
529 else
530 RETURN_NOT_OK(init_tile(format_version, name, t, t_var));
531 }
532
533 // Get information about the tile in its fragment
534 auto tile_attr_uri = fragment->uri(name);
535 auto tile_idx = tile->tile_idx();
536 uint64_t tile_attr_offset;
537 RETURN_NOT_OK(fragment->file_offset(name, tile_idx, &tile_attr_offset));
538 uint64_t tile_persisted_size;
539 RETURN_NOT_OK(
540 fragment->persisted_tile_size(name, tile_idx, &tile_persisted_size));
541
542 // Try the cache first.
543 // TODO Parallelize.
544 bool cache_hit;
545 RETURN_NOT_OK(storage_manager_->read_from_cache(
546 tile_attr_uri,
547 tile_attr_offset,
548 t->filtered_buffer(),
549 tile_persisted_size,
550 &cache_hit));
551
552 if (!cache_hit) {
553 // Add the region of the fragment to be read.
554 RETURN_NOT_OK(t->filtered_buffer()->realloc(tile_persisted_size));
555 t->filtered_buffer()->set_size(tile_persisted_size);
556 t->filtered_buffer()->reset_offset();
557 all_regions[tile_attr_uri].emplace_back(
558 tile_attr_offset, t->filtered_buffer()->data(), tile_persisted_size);
559 }
560
561 if (var_size) {
562 auto tile_attr_var_uri = fragment->var_uri(name);
563 uint64_t tile_attr_var_offset;
564 RETURN_NOT_OK(
565 fragment->file_var_offset(name, tile_idx, &tile_attr_var_offset));
566 uint64_t tile_var_persisted_size;
567 RETURN_NOT_OK(fragment->persisted_tile_var_size(
568 name, tile_idx, &tile_var_persisted_size));
569
570 Buffer cached_var_buffer;
571 RETURN_NOT_OK(storage_manager_->read_from_cache(
572 tile_attr_var_uri,
573 tile_attr_var_offset,
574 t_var->filtered_buffer(),
575 tile_var_persisted_size,
576 &cache_hit));
577
578 if (!cache_hit) {
579 // Add the region of the fragment to be read.
580 RETURN_NOT_OK(
581 t_var->filtered_buffer()->realloc(tile_var_persisted_size));
582 t_var->filtered_buffer()->set_size(tile_var_persisted_size);
583 t_var->filtered_buffer()->reset_offset();
584 all_regions[tile_attr_var_uri].emplace_back(
585 tile_attr_var_offset,
586 t_var->filtered_buffer()->data(),
587 tile_var_persisted_size);
588 }
589 }
590
591 if (nullable) {
592 auto tile_validity_attr_uri = fragment->validity_uri(name);
593 uint64_t tile_attr_validity_offset;
594 RETURN_NOT_OK(fragment->file_validity_offset(
595 name, tile_idx, &tile_attr_validity_offset));
596 uint64_t tile_validity_persisted_size;
597 RETURN_NOT_OK(fragment->persisted_tile_validity_size(
598 name, tile_idx, &tile_validity_persisted_size));
599
600 Buffer cached_valdity_buffer;
601 RETURN_NOT_OK(storage_manager_->read_from_cache(
602 tile_validity_attr_uri,
603 tile_attr_validity_offset,
604 t_validity->filtered_buffer(),
605 tile_validity_persisted_size,
606 &cache_hit));
607
608 if (!cache_hit) {
609 // Add the region of the fragment to be read.
610 RETURN_NOT_OK(t_validity->filtered_buffer()->realloc(
611 tile_validity_persisted_size));
612 t_validity->filtered_buffer()->set_size(tile_validity_persisted_size);
613 t_validity->filtered_buffer()->reset_offset();
614 all_regions[tile_validity_attr_uri].emplace_back(
615 tile_attr_validity_offset,
616 t_validity->filtered_buffer()->data(),
617 tile_validity_persisted_size);
618 }
619 }
620 }
621
622 // We're done accessing elements within `result_tiles`.
623 ul.unlock();
624
625 // Do not use the read-ahead cache because tiles will be
626 // cached in the tile cache.
627 const bool use_read_ahead = false;
628
629 // Enqueue all regions to be read.
630 for (const auto& item : all_regions) {
631 RETURN_NOT_OK(storage_manager_->vfs()->read_all(
632 item.first,
633 item.second,
634 storage_manager_->io_tp(),
635 tasks,
636 use_read_ahead));
637 }
638
639 return Status::Ok();
640 }
641
unfilter_tiles(const std::string & name,const std::vector<ResultTile * > * result_tiles) const642 Status ReaderBase::unfilter_tiles(
643 const std::string& name,
644 const std::vector<ResultTile*>* result_tiles) const {
645 auto stat_type = (array_schema_->is_attr(name)) ? "unfilter_attr_tiles" :
646 "unfilter_coord_tiles";
647 auto timer_se = stats_->start_timer(stat_type);
648
649 auto var_size = array_schema_->var_size(name);
650 auto nullable = array_schema_->is_nullable(name);
651 auto num_tiles = static_cast<uint64_t>(result_tiles->size());
652
653 auto status = parallel_for(
654 storage_manager_->compute_tp(), 0, num_tiles, [&, this](uint64_t i) {
655 ResultTile* const tile = result_tiles->at(i);
656
657 auto& fragment = fragment_metadata_[tile->frag_idx()];
658 auto format_version = fragment->format_version();
659
660 // Applicable for zipped coordinates only to versions < 5
661 // Applicable for separate coordinates only to version >= 5
662 if (name != constants::coords ||
663 (name == constants::coords && format_version < 5) ||
664 (array_schema_->is_dim(name) && format_version >= 5)) {
665 auto tile_tuple = tile->tile_tuple(name);
666
667 // Skip non-existent attributes/dimensions (e.g. coords in the
668 // dense case).
669 if (tile_tuple == nullptr ||
670 std::get<0>(*tile_tuple).filtered_buffer()->size() == 0)
671 return Status::Ok();
672
673 // Get information about the tile in its fragment.
674 auto tile_attr_uri = fragment->uri(name);
675 auto tile_idx = tile->tile_idx();
676 uint64_t tile_attr_offset;
677 RETURN_NOT_OK(
678 fragment->file_offset(name, tile_idx, &tile_attr_offset));
679
680 auto& t = std::get<0>(*tile_tuple);
681 auto& t_var = std::get<1>(*tile_tuple);
682 auto& t_validity = std::get<2>(*tile_tuple);
683
684 // Cache 't'.
685 if (t.filtered()) {
686 // Store the filtered buffer in the tile cache.
687 RETURN_NOT_OK(storage_manager_->write_to_cache(
688 tile_attr_uri, tile_attr_offset, t.filtered_buffer()));
689 }
690
691 // Cache 't_var'.
692 if (var_size && t_var.filtered()) {
693 auto tile_attr_var_uri = fragment->var_uri(name);
694 uint64_t tile_attr_var_offset;
695 RETURN_NOT_OK(fragment->file_var_offset(
696 name, tile_idx, &tile_attr_var_offset));
697
698 // Store the filtered buffer in the tile cache.
699 RETURN_NOT_OK(storage_manager_->write_to_cache(
700 tile_attr_var_uri,
701 tile_attr_var_offset,
702 t_var.filtered_buffer()));
703 }
704
705 // Cache 't_validity'.
706 if (nullable && t_validity.filtered()) {
707 auto tile_attr_validity_uri = fragment->validity_uri(name);
708 uint64_t tile_attr_validity_offset;
709 RETURN_NOT_OK(fragment->file_validity_offset(
710 name, tile_idx, &tile_attr_validity_offset));
711
712 // Store the filtered buffer in the tile cache.
713 RETURN_NOT_OK(storage_manager_->write_to_cache(
714 tile_attr_validity_uri,
715 tile_attr_validity_offset,
716 t_validity.filtered_buffer()));
717 }
718
719 // Unfilter 't' for fixed-sized tiles, otherwise unfilter both 't' and
720 // 't_var' for var-sized tiles.
721 if (!var_size) {
722 if (!nullable)
723 RETURN_NOT_OK(unfilter_tile(name, &t));
724 else
725 RETURN_NOT_OK(unfilter_tile_nullable(name, &t, &t_validity));
726 } else {
727 if (!nullable)
728 RETURN_NOT_OK(unfilter_tile(name, &t, &t_var));
729 else
730 RETURN_NOT_OK(
731 unfilter_tile_nullable(name, &t, &t_var, &t_validity));
732 }
733 }
734
735 return Status::Ok();
736 });
737
738 RETURN_CANCEL_OR_ERROR(status);
739
740 return Status::Ok();
741 }
742
unfilter_tile(const std::string & name,Tile * tile) const743 Status ReaderBase::unfilter_tile(const std::string& name, Tile* tile) const {
744 FilterPipeline filters = array_schema_->filters(name);
745
746 // Append an encryption unfilter when necessary.
747 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
748 &filters, array_->get_encryption_key()));
749
750 // Reverse the tile filters.
751 RETURN_NOT_OK(filters.run_reverse(
752 stats_,
753 tile,
754 storage_manager_->compute_tp(),
755 storage_manager_->config()));
756
757 return Status::Ok();
758 }
759
unfilter_tile(const std::string & name,Tile * tile,Tile * tile_var) const760 Status ReaderBase::unfilter_tile(
761 const std::string& name, Tile* tile, Tile* tile_var) const {
762 FilterPipeline offset_filters = array_schema_->cell_var_offsets_filters();
763 FilterPipeline filters = array_schema_->filters(name);
764
765 // Append an encryption unfilter when necessary.
766 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
767 &offset_filters, array_->get_encryption_key()));
768 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
769 &filters, array_->get_encryption_key()));
770
771 // Reverse the tile filters.
772 RETURN_NOT_OK(offset_filters.run_reverse(
773 stats_, tile, storage_manager_->compute_tp(), config_));
774 RETURN_NOT_OK(filters.run_reverse(
775 stats_, tile_var, storage_manager_->compute_tp(), config_));
776
777 return Status::Ok();
778 }
779
unfilter_tile_nullable(const std::string & name,Tile * tile,Tile * tile_validity) const780 Status ReaderBase::unfilter_tile_nullable(
781 const std::string& name, Tile* tile, Tile* tile_validity) const {
782 FilterPipeline filters = array_schema_->filters(name);
783 FilterPipeline validity_filters = array_schema_->cell_validity_filters();
784
785 // Append an encryption unfilter when necessary.
786 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
787 &filters, array_->get_encryption_key()));
788 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
789 &validity_filters, array_->get_encryption_key()));
790
791 // Reverse the tile filters.
792 RETURN_NOT_OK(filters.run_reverse(
793 stats_,
794 tile,
795 storage_manager_->compute_tp(),
796 storage_manager_->config()));
797
798 // Reverse the validity tile filters.
799 RETURN_NOT_OK(validity_filters.run_reverse(
800 stats_,
801 tile_validity,
802 storage_manager_->compute_tp(),
803 storage_manager_->config()));
804
805 return Status::Ok();
806 }
807
unfilter_tile_nullable(const std::string & name,Tile * tile,Tile * tile_var,Tile * tile_validity) const808 Status ReaderBase::unfilter_tile_nullable(
809 const std::string& name,
810 Tile* tile,
811 Tile* tile_var,
812 Tile* tile_validity) const {
813 FilterPipeline offset_filters = array_schema_->cell_var_offsets_filters();
814 FilterPipeline filters = array_schema_->filters(name);
815 FilterPipeline validity_filters = array_schema_->cell_validity_filters();
816
817 // Append an encryption unfilter when necessary.
818 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
819 &offset_filters, array_->get_encryption_key()));
820 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
821 &filters, array_->get_encryption_key()));
822 RETURN_NOT_OK(FilterPipeline::append_encryption_filter(
823 &validity_filters, array_->get_encryption_key()));
824
825 // Reverse the tile filters.
826 RETURN_NOT_OK(offset_filters.run_reverse(
827 stats_,
828 tile,
829 storage_manager_->compute_tp(),
830 storage_manager_->config()));
831 RETURN_NOT_OK(filters.run_reverse(
832 stats_,
833 tile_var,
834 storage_manager_->compute_tp(),
835 storage_manager_->config()));
836
837 // Reverse the validity tile filters.
838 RETURN_NOT_OK(validity_filters.run_reverse(
839 stats_,
840 tile_validity,
841 storage_manager_->compute_tp(),
842 storage_manager_->config()));
843
844 return Status::Ok();
845 }
846
copy_coordinates(const std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs)847 Status ReaderBase::copy_coordinates(
848 const std::vector<ResultTile*>* result_tiles,
849 std::vector<ResultCellSlab>* result_cell_slabs) {
850 auto timer_se = stats_->start_timer("copy_coordinates");
851
852 if (result_cell_slabs->empty() && result_tiles->empty()) {
853 zero_out_buffer_sizes();
854 return Status::Ok();
855 }
856
857 const uint64_t stride = UINT64_MAX;
858
859 // Build a list of coordinate names to copy, separating them by
860 // whether they are of fixed or variable length. The motivation
861 // is that copying fixed and variable cells require two different
862 // cell slab partitions. Processing them separately allows us to
863 // reduce memory use.
864 std::vector<std::string> fixed_names;
865 std::vector<std::string> var_names;
866
867 for (const auto& it : buffers_) {
868 const auto& name = it.first;
869 if (copy_overflowed_)
870 break;
871 if (!(name == constants::coords || array_schema_->is_dim(name)))
872 continue;
873
874 if (array_schema_->var_size(name))
875 var_names.emplace_back(name);
876 else
877 fixed_names.emplace_back(name);
878 }
879
880 // Copy result cells for fixed-sized coordinates.
881 if (!fixed_names.empty()) {
882 std::vector<size_t> fixed_cs_partitions;
883 compute_fixed_cs_partitions(result_cell_slabs, &fixed_cs_partitions);
884
885 for (const auto& name : fixed_names) {
886 RETURN_CANCEL_OR_ERROR(copy_fixed_cells(
887 name, stride, result_cell_slabs, &fixed_cs_partitions));
888 if (clear_coords_tiles_on_copy_)
889 clear_tiles(name, result_tiles);
890 }
891 }
892
893 // Copy result cells for var-sized coordinates.
894 if (!var_names.empty()) {
895 std::vector<std::pair<size_t, size_t>> var_cs_partitions;
896 size_t total_var_cs_length;
897 compute_var_cs_partitions(
898 result_cell_slabs, &var_cs_partitions, &total_var_cs_length);
899
900 for (const auto& name : var_names) {
901 RETURN_CANCEL_OR_ERROR(copy_var_cells(
902 name,
903 stride,
904 result_cell_slabs,
905 &var_cs_partitions,
906 total_var_cs_length));
907 if (clear_coords_tiles_on_copy_)
908 clear_tiles(name, result_tiles);
909 }
910 }
911
912 return Status::Ok();
913 }
914
copy_attribute_values(const uint64_t stride,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs,Subarray & subarray,uint64_t memory_budget,bool include_dim)915 Status ReaderBase::copy_attribute_values(
916 const uint64_t stride,
917 std::vector<ResultTile*>* result_tiles,
918 std::vector<ResultCellSlab>* result_cell_slabs,
919 Subarray& subarray,
920 uint64_t memory_budget,
921 bool include_dim) {
922 auto timer_se = stats_->start_timer("copy_attr_values");
923
924 if (result_cell_slabs->empty() && result_tiles->empty()) {
925 zero_out_buffer_sizes();
926 return Status::Ok();
927 }
928
929 const std::unordered_set<std::string>& condition_names =
930 condition_.field_names();
931
932 // Build a set of attribute names to process.
933 std::unordered_map<std::string, ProcessTileFlags> names;
934 for (const auto& it : buffers_) {
935 const auto& name = it.first;
936
937 if (copy_overflowed_) {
938 break;
939 }
940
941 if (!include_dim &&
942 (name == constants::coords || array_schema_->is_dim(name))) {
943 continue;
944 }
945
946 // If the query condition has a clause for `name`, we will only
947 // flag it to copy because we have already preloaded the offsets
948 // and read the tiles in `apply_query_condition`.
949 ProcessTileFlags flags = ProcessTileFlag::COPY;
950 if (condition_names.count(name) == 0) {
951 flags |= ProcessTileFlag::READ;
952 }
953
954 names[name] = flags;
955 }
956
957 if (!names.empty()) {
958 RETURN_NOT_OK(process_tiles(
959 &names,
960 result_tiles,
961 result_cell_slabs,
962 &subarray,
963 stride,
964 memory_budget,
965 nullptr));
966 }
967
968 return Status::Ok();
969 }
970
copy_fixed_cells(const std::string & name,uint64_t stride,const std::vector<ResultCellSlab> * result_cell_slabs,std::vector<size_t> * fixed_cs_partitions)971 Status ReaderBase::copy_fixed_cells(
972 const std::string& name,
973 uint64_t stride,
974 const std::vector<ResultCellSlab>* result_cell_slabs,
975 std::vector<size_t>* fixed_cs_partitions) {
976 auto stat_type = (array_schema_->is_attr(name)) ? "copy_fixed_attr_values" :
977 "copy_fixed_coords";
978 auto timer_se = stats_->start_timer(stat_type);
979
980 if (result_cell_slabs->empty()) {
981 zero_out_buffer_sizes();
982 return Status::Ok();
983 }
984
985 auto it = buffers_.find(name);
986 auto buffer_size = it->second.buffer_size_;
987 auto cell_size = array_schema_->cell_size(name);
988
989 // Precompute the cell range destination offsets in the buffer.
990 uint64_t buffer_offset = 0;
991 auto size = std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
992 std::vector<uint64_t> cs_offsets(size);
993 for (uint64_t i = 0; i < cs_offsets.size(); i++) {
994 const auto& cs = result_cell_slabs->at(i);
995
996 auto cs_length = cs.length_;
997 if (i == copy_end_.first - 1) {
998 cs_length = copy_end_.second;
999 }
1000
1001 auto bytes_to_copy = cs_length * cell_size;
1002 cs_offsets[i] = buffer_offset;
1003 buffer_offset += bytes_to_copy;
1004 }
1005
1006 // Handle overflow.
1007 if (buffer_offset > *buffer_size) {
1008 copy_overflowed_ = true;
1009 return Status::Ok();
1010 }
1011
1012 // Copy result cell slabs in parallel.
1013 std::function<Status(size_t)> copy_fn = std::bind(
1014 &ReaderBase::copy_partitioned_fixed_cells,
1015 this,
1016 std::placeholders::_1,
1017 &name,
1018 stride,
1019 result_cell_slabs,
1020 &cs_offsets,
1021 fixed_cs_partitions);
1022 auto status = parallel_for(
1023 storage_manager_->compute_tp(),
1024 0,
1025 fixed_cs_partitions->size(),
1026 std::move(copy_fn));
1027
1028 RETURN_NOT_OK(status);
1029
1030 // Update buffer offsets
1031 *(buffers_[name].buffer_size_) = buffer_offset;
1032 if (array_schema_->is_nullable(name)) {
1033 *(buffers_[name].validity_vector_.buffer_size()) =
1034 (buffer_offset / cell_size) * constants::cell_validity_size;
1035 }
1036
1037 return Status::Ok();
1038 }
1039
compute_fixed_cs_partitions(const std::vector<ResultCellSlab> * result_cell_slabs,std::vector<size_t> * fixed_cs_partitions)1040 void ReaderBase::compute_fixed_cs_partitions(
1041 const std::vector<ResultCellSlab>* result_cell_slabs,
1042 std::vector<size_t>* fixed_cs_partitions) {
1043 if (result_cell_slabs->empty()) {
1044 return;
1045 }
1046
1047 const int num_copy_threads =
1048 storage_manager_->compute_tp()->concurrency_level();
1049
1050 // Calculate the partition sizes.
1051 auto num_cs = std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
1052 const uint64_t num_cs_partitions =
1053 std::min<uint64_t>(num_copy_threads, num_cs);
1054 const uint64_t cs_per_partition = num_cs / num_cs_partitions;
1055 const uint64_t cs_per_partition_carry = num_cs % num_cs_partitions;
1056
1057 // Calculate the partition offsets.
1058 uint64_t num_cs_partitioned = 0;
1059 fixed_cs_partitions->reserve(num_cs_partitions);
1060 for (uint64_t i = 0; i < num_cs_partitions; ++i) {
1061 const uint64_t num_cs_in_partition =
1062 cs_per_partition + ((i < cs_per_partition_carry) ? 1 : 0);
1063 num_cs_partitioned += num_cs_in_partition;
1064 fixed_cs_partitions->emplace_back(num_cs_partitioned);
1065 }
1066 }
1067
offsets_bytesize() const1068 uint64_t ReaderBase::offsets_bytesize() const {
1069 assert(offsets_bitsize_ == 32 || offsets_bitsize_ == 64);
1070 return offsets_bitsize_ == 32 ? sizeof(uint32_t) :
1071 constants::cell_var_offset_size;
1072 }
1073
copy_partitioned_fixed_cells(const size_t partition_idx,const std::string * const name,const uint64_t stride,const std::vector<ResultCellSlab> * const result_cell_slabs,const std::vector<uint64_t> * cs_offsets,const std::vector<size_t> * cs_partitions)1074 Status ReaderBase::copy_partitioned_fixed_cells(
1075 const size_t partition_idx,
1076 const std::string* const name,
1077 const uint64_t stride,
1078 const std::vector<ResultCellSlab>* const result_cell_slabs,
1079 const std::vector<uint64_t>* cs_offsets,
1080 const std::vector<size_t>* cs_partitions) {
1081 assert(name);
1082 assert(result_cell_slabs);
1083
1084 // For easy reference.
1085 auto nullable = array_schema_->is_nullable(*name);
1086 auto it = buffers_.find(*name);
1087 auto buffer = (unsigned char*)it->second.buffer_;
1088 auto buffer_validity = (unsigned char*)it->second.validity_vector_.buffer();
1089 auto cell_size = array_schema_->cell_size(*name);
1090 ByteVecValue fill_value;
1091 uint8_t fill_value_validity = 0;
1092 if (array_schema_->is_attr(*name)) {
1093 fill_value = array_schema_->attribute(*name)->fill_value();
1094 fill_value_validity =
1095 array_schema_->attribute(*name)->fill_value_validity();
1096 }
1097 uint64_t fill_value_size = (uint64_t)fill_value.size();
1098
1099 // Calculate the partition to operate on.
1100 const uint64_t cs_idx_start =
1101 partition_idx == 0 ? 0 : cs_partitions->at(partition_idx - 1);
1102 const uint64_t cs_idx_end = cs_partitions->at(partition_idx);
1103
1104 // Copy the cells.
1105 for (uint64_t cs_idx = cs_idx_start; cs_idx < cs_idx_end; ++cs_idx) {
1106 // If there was an overflow and we fixed it, don't copy past the new
1107 // result cell slabs.
1108 if (cs_idx >= copy_end_.first) {
1109 break;
1110 }
1111
1112 const auto& cs = (*result_cell_slabs)[cs_idx];
1113 uint64_t offset = cs_offsets->at(cs_idx);
1114
1115 auto cs_length = cs.length_;
1116 if (cs_idx == copy_end_.first - 1) {
1117 cs_length = copy_end_.second;
1118 }
1119
1120 // Copy
1121
1122 // First we check if this is an older (pre TileDB 2.0) array with zipped
1123 // coordinates and the user has requested split buffer if so we should
1124 // proceed to copying the tile If not, and there is no tile or the tile is
1125 // empty for the field then this is a read of an older fragment in schema
1126 // evolution. In that case we want to set the field to fill values for this
1127 // for this tile.
1128 const bool split_buffer_for_zipped_coords =
1129 array_schema_->is_dim(*name) && cs.tile_->stores_zipped_coords();
1130 if ((cs.tile_ == nullptr || cs.tile_->tile_tuple(*name) == nullptr) &&
1131 !split_buffer_for_zipped_coords) { // Empty range or attributed added
1132 // in schema evolution
1133 auto bytes_to_copy = cs_length * cell_size;
1134 auto fill_num = bytes_to_copy / fill_value_size;
1135 for (uint64_t j = 0; j < fill_num; ++j) {
1136 std::memcpy(buffer + offset, fill_value.data(), fill_value_size);
1137 if (nullable) {
1138 std::memset(
1139 buffer_validity +
1140 (offset / cell_size * constants::cell_validity_size),
1141 fill_value_validity,
1142 constants::cell_validity_size);
1143 }
1144 offset += fill_value_size;
1145 }
1146 } else { // Non-empty range
1147 if (stride == UINT64_MAX) {
1148 if (!nullable)
1149 RETURN_NOT_OK(
1150 cs.tile_->read(*name, buffer, offset, cs.start_, cs_length));
1151 else
1152 RETURN_NOT_OK(cs.tile_->read_nullable(
1153 *name, buffer, offset, cs.start_, cs_length, buffer_validity));
1154 } else {
1155 auto cell_offset = offset;
1156 auto start = cs.start_;
1157 for (uint64_t j = 0; j < cs_length; ++j) {
1158 if (!nullable)
1159 RETURN_NOT_OK(cs.tile_->read(*name, buffer, cell_offset, start, 1));
1160 else
1161 RETURN_NOT_OK(cs.tile_->read_nullable(
1162 *name, buffer, cell_offset, start, 1, buffer_validity));
1163 cell_offset += cell_size;
1164 start += stride;
1165 }
1166 }
1167 }
1168 }
1169
1170 return Status::Ok();
1171 }
1172
copy_var_cells(const std::string & name,const uint64_t stride,std::vector<ResultCellSlab> * result_cell_slabs,std::vector<std::pair<size_t,size_t>> * var_cs_partitions,size_t total_cs_length)1173 Status ReaderBase::copy_var_cells(
1174 const std::string& name,
1175 const uint64_t stride,
1176 std::vector<ResultCellSlab>* result_cell_slabs,
1177 std::vector<std::pair<size_t, size_t>>* var_cs_partitions,
1178 size_t total_cs_length) {
1179 auto stat_type = (array_schema_->is_attr(name)) ? "copy_var_attr_values" :
1180 "copy_var_coords";
1181 auto timer_se = stats_->start_timer(stat_type);
1182
1183 if (result_cell_slabs->empty()) {
1184 zero_out_buffer_sizes();
1185 return Status::Ok();
1186 }
1187
1188 std::vector<uint64_t> offset_offsets_per_cs(total_cs_length);
1189 std::vector<uint64_t> var_offsets_per_cs(total_cs_length);
1190
1191 // Compute the destinations of offsets and var-len data in the buffers.
1192 uint64_t total_offset_size, total_var_size, total_validity_size;
1193 RETURN_NOT_OK(compute_var_cell_destinations(
1194 name,
1195 stride,
1196 result_cell_slabs,
1197 &offset_offsets_per_cs,
1198 &var_offsets_per_cs,
1199 &total_offset_size,
1200 &total_var_size,
1201 &total_validity_size));
1202
1203 // Check for overflow and return early (without copying) in that case.
1204 if (copy_overflowed_) {
1205 return Status::Ok();
1206 }
1207
1208 // Copy result cell slabs in parallel
1209 std::function<Status(size_t)> copy_fn = std::bind(
1210 &ReaderBase::copy_partitioned_var_cells,
1211 this,
1212 std::placeholders::_1,
1213 &name,
1214 stride,
1215 result_cell_slabs,
1216 &offset_offsets_per_cs,
1217 &var_offsets_per_cs,
1218 var_cs_partitions);
1219 auto status = parallel_for(
1220 storage_manager_->compute_tp(), 0, var_cs_partitions->size(), copy_fn);
1221
1222 RETURN_NOT_OK(status);
1223
1224 // Update buffer offsets
1225 *(buffers_[name].buffer_size_) = total_offset_size;
1226 *(buffers_[name].buffer_var_size_) = total_var_size;
1227 if (array_schema_->is_nullable(name))
1228 *(buffers_[name].validity_vector_.buffer_size()) = total_validity_size;
1229
1230 return Status::Ok();
1231 }
1232
compute_var_cs_partitions(const std::vector<ResultCellSlab> * result_cell_slabs,std::vector<std::pair<size_t,size_t>> * var_cs_partitions,size_t * total_var_cs_length)1233 void ReaderBase::compute_var_cs_partitions(
1234 const std::vector<ResultCellSlab>* result_cell_slabs,
1235 std::vector<std::pair<size_t, size_t>>* var_cs_partitions,
1236 size_t* total_var_cs_length) {
1237 if (result_cell_slabs->empty()) {
1238 return;
1239 }
1240
1241 const int num_copy_threads =
1242 storage_manager_->compute_tp()->concurrency_level();
1243
1244 // Calculate the partition range.
1245 const uint64_t num_cs =
1246 std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
1247 const uint64_t num_cs_partitions =
1248 std::min<uint64_t>(num_copy_threads, num_cs);
1249 const uint64_t cs_per_partition = num_cs / num_cs_partitions;
1250 const uint64_t cs_per_partition_carry = num_cs % num_cs_partitions;
1251
1252 // Compute the boundary between each partition. Each boundary
1253 // is represented by an `std::pair` that contains the total
1254 // length of each cell slab in the leading partition and an
1255 // exclusive cell slab index that ends the partition.
1256 uint64_t next_partition_idx = cs_per_partition;
1257 if (cs_per_partition_carry > 0)
1258 ++next_partition_idx;
1259
1260 *total_var_cs_length = 0;
1261 var_cs_partitions->reserve(num_cs_partitions);
1262 for (uint64_t cs_idx = 0; cs_idx < num_cs; cs_idx++) {
1263 if (cs_idx == next_partition_idx) {
1264 var_cs_partitions->emplace_back(*total_var_cs_length, cs_idx);
1265
1266 // The final partition may contain extra cell slabs that did
1267 // not evenly divide into the partition range. Set the
1268 // `next_partition_idx` to zero and build the last boundary
1269 // after this for-loop.
1270 if (var_cs_partitions->size() == num_cs_partitions) {
1271 next_partition_idx = 0;
1272 } else {
1273 next_partition_idx += cs_per_partition;
1274 if (cs_idx < (cs_per_partition_carry - 1))
1275 ++next_partition_idx;
1276 }
1277 }
1278
1279 auto cs_length = result_cell_slabs->at(cs_idx).length_;
1280 if (cs_idx == copy_end_.first - 1) {
1281 cs_length = copy_end_.second;
1282 }
1283 *total_var_cs_length += cs_length;
1284 }
1285
1286 // Store the final boundary.
1287 var_cs_partitions->emplace_back(*total_var_cs_length, num_cs);
1288 }
1289
compute_var_cell_destinations(const std::string & name,uint64_t stride,std::vector<ResultCellSlab> * result_cell_slabs,std::vector<uint64_t> * offset_offsets_per_cs,std::vector<uint64_t> * var_offsets_per_cs,uint64_t * total_offset_size,uint64_t * total_var_size,uint64_t * total_validity_size)1290 Status ReaderBase::compute_var_cell_destinations(
1291 const std::string& name,
1292 uint64_t stride,
1293 std::vector<ResultCellSlab>* result_cell_slabs,
1294 std::vector<uint64_t>* offset_offsets_per_cs,
1295 std::vector<uint64_t>* var_offsets_per_cs,
1296 uint64_t* total_offset_size,
1297 uint64_t* total_var_size,
1298 uint64_t* total_validity_size) {
1299 // For easy reference
1300 auto nullable = array_schema_->is_nullable(name);
1301 auto num_cs = std::min<uint64_t>(result_cell_slabs->size(), copy_end_.first);
1302 auto offset_size = offsets_bytesize();
1303 ByteVecValue fill_value;
1304 if (array_schema_->is_attr(name))
1305 fill_value = array_schema_->attribute(name)->fill_value();
1306 auto fill_value_size = (uint64_t)fill_value.size();
1307
1308 auto it = buffers_.find(name);
1309 auto buffer_size = *it->second.buffer_size_;
1310 auto buffer_var_size = *it->second.buffer_var_size_;
1311 auto buffer_validity_size = it->second.validity_vector_.buffer_size();
1312
1313 if (offsets_extra_element_)
1314 buffer_size -= offset_size;
1315
1316 // Compute the destinations for all result cell slabs
1317 *total_offset_size = 0;
1318 *total_var_size = 0;
1319 *total_validity_size = 0;
1320 size_t total_cs_length = 0;
1321 for (uint64_t cs_idx = 0; cs_idx < num_cs; cs_idx++) {
1322 const auto& cs = result_cell_slabs->at(cs_idx);
1323
1324 auto cs_length = cs.length_;
1325 if (cs_idx == copy_end_.first - 1) {
1326 cs_length = copy_end_.second;
1327 }
1328
1329 // Get tile information, if the range is nonempty.
1330 uint64_t* tile_offsets = nullptr;
1331 uint64_t tile_cell_num = 0;
1332 uint64_t tile_var_size = 0;
1333 if (cs.tile_ != nullptr && cs.tile_->tile_tuple(name) != nullptr) {
1334 const auto tile_tuple = cs.tile_->tile_tuple(name);
1335 const auto& tile = std::get<0>(*tile_tuple);
1336 const auto& tile_var = std::get<1>(*tile_tuple);
1337
1338 // Get the internal buffer to the offset values.
1339 Buffer* const buffer = tile.buffer();
1340
1341 tile_offsets = (uint64_t*)buffer->data();
1342 tile_cell_num = tile.cell_num();
1343 tile_var_size = tile_var.size();
1344 }
1345
1346 // Compute the destinations for each cell in the range.
1347 uint64_t dest_vec_idx = 0;
1348 stride = (stride == UINT64_MAX) ? 1 : stride;
1349
1350 for (auto cell_idx = cs.start_; dest_vec_idx < cs_length;
1351 cell_idx += stride, dest_vec_idx++) {
1352 // Get size of variable-sized cell
1353 uint64_t cell_var_size = 0;
1354 if (cs.tile_ == nullptr || cs.tile_->tile_tuple(name) == nullptr) {
1355 cell_var_size = fill_value_size;
1356 } else {
1357 cell_var_size =
1358 (cell_idx != tile_cell_num - 1) ?
1359 tile_offsets[cell_idx + 1] - tile_offsets[cell_idx] :
1360 tile_var_size - (tile_offsets[cell_idx] - tile_offsets[0]);
1361 }
1362
1363 if (*total_offset_size + offset_size > buffer_size ||
1364 *total_var_size + cell_var_size > buffer_var_size ||
1365 (buffer_validity_size &&
1366 *total_validity_size + constants::cell_validity_size >
1367 *buffer_validity_size)) {
1368 // Try to fix the overflow by reducing the result cell slabs to copy.
1369 if (fix_var_sized_overflows_) {
1370 copy_end_ =
1371 std::pair<uint64_t, uint64_t>(cs_idx + 1, cell_idx - cs.start_);
1372
1373 // Cannot even copy one cell, return overflow.
1374 if (cs_idx == 0 && cell_idx == result_cell_slabs->front().start_) {
1375 copy_overflowed_ = true;
1376 }
1377 } else {
1378 copy_overflowed_ = true;
1379 }
1380
1381 // In case an extra offset is configured, we need to account memory for
1382 // it on each read
1383 *total_offset_size += offsets_extra_element_ ? offset_size : 0;
1384
1385 return Status::Ok();
1386 }
1387
1388 // Record destination offsets.
1389 (*offset_offsets_per_cs)[total_cs_length + dest_vec_idx] =
1390 *total_offset_size;
1391 (*var_offsets_per_cs)[total_cs_length + dest_vec_idx] = *total_var_size;
1392 *total_offset_size += offset_size;
1393 *total_var_size += cell_var_size;
1394 if (nullable)
1395 *total_validity_size += constants::cell_validity_size;
1396 }
1397
1398 total_cs_length += cs_length;
1399 }
1400
1401 // In case an extra offset is configured, we need to account memory for it on
1402 // each read
1403 *total_offset_size += offsets_extra_element_ ? offset_size : 0;
1404
1405 return Status::Ok();
1406 }
1407
copy_partitioned_var_cells(const size_t partition_idx,const std::string * const name,uint64_t stride,const std::vector<ResultCellSlab> * const result_cell_slabs,const std::vector<uint64_t> * const offset_offsets_per_cs,const std::vector<uint64_t> * const var_offsets_per_cs,const std::vector<std::pair<size_t,size_t>> * const cs_partitions)1408 Status ReaderBase::copy_partitioned_var_cells(
1409 const size_t partition_idx,
1410 const std::string* const name,
1411 uint64_t stride,
1412 const std::vector<ResultCellSlab>* const result_cell_slabs,
1413 const std::vector<uint64_t>* const offset_offsets_per_cs,
1414 const std::vector<uint64_t>* const var_offsets_per_cs,
1415 const std::vector<std::pair<size_t, size_t>>* const cs_partitions) {
1416 assert(name);
1417 assert(result_cell_slabs);
1418
1419 auto it = buffers_.find(*name);
1420 auto nullable = array_schema_->is_nullable(*name);
1421 auto buffer = (unsigned char*)it->second.buffer_;
1422 auto buffer_var = (unsigned char*)it->second.buffer_var_;
1423 auto buffer_validity = (unsigned char*)it->second.validity_vector_.buffer();
1424 auto offset_size = offsets_bytesize();
1425 ByteVecValue fill_value;
1426 uint8_t fill_value_validity = 0;
1427 if (array_schema_->is_attr(*name)) {
1428 fill_value = array_schema_->attribute(*name)->fill_value();
1429 fill_value_validity =
1430 array_schema_->attribute(*name)->fill_value_validity();
1431 }
1432 auto fill_value_size = (uint64_t)fill_value.size();
1433 auto attr_datatype_size = datatype_size(array_schema_->type(*name));
1434
1435 // Fetch the starting array offset into both `offset_offsets_per_cs`
1436 // and `var_offsets_per_cs`.
1437 size_t arr_offset =
1438 partition_idx == 0 ? 0 : (*cs_partitions)[partition_idx - 1].first;
1439
1440 // Fetch the inclusive starting cell slab index and the exclusive ending
1441 // cell slab index.
1442 const size_t start_cs_idx =
1443 partition_idx == 0 ? 0 : (*cs_partitions)[partition_idx - 1].second;
1444 const size_t end_cs_idx = (*cs_partitions)[partition_idx].second;
1445
1446 // Copy all cells within the range of cell slabs.
1447 for (uint64_t cs_idx = start_cs_idx; cs_idx < end_cs_idx; ++cs_idx) {
1448 // If there was an overflow and we fixed it, don't copy past the new
1449 // result cell slabs.
1450 if (cs_idx >= copy_end_.first) {
1451 break;
1452 }
1453 const auto& cs = (*result_cell_slabs)[cs_idx];
1454
1455 auto cs_length = cs.length_;
1456 if (cs_idx == copy_end_.first - 1) {
1457 cs_length = copy_end_.second;
1458 }
1459
1460 // Get tile information, if the range is nonempty.
1461 uint64_t* tile_offsets = nullptr;
1462 Tile* tile_var = nullptr;
1463 Tile* tile_validity = nullptr;
1464 uint64_t tile_cell_num = 0;
1465 if (cs.tile_ != nullptr && cs.tile_->tile_tuple(*name) != nullptr) {
1466 const auto tile_tuple = cs.tile_->tile_tuple(*name);
1467 Tile* const tile = &std::get<0>(*tile_tuple);
1468 tile_var = &std::get<1>(*tile_tuple);
1469 tile_validity = &std::get<2>(*tile_tuple);
1470
1471 // Get the internal buffer to the offset values.
1472 Buffer* const buffer = tile->buffer();
1473
1474 tile_offsets = (uint64_t*)buffer->data();
1475 tile_cell_num = tile->cell_num();
1476 }
1477
1478 // Copy each cell in the range
1479 uint64_t dest_vec_idx = 0;
1480 stride = (stride == UINT64_MAX) ? 1 : stride;
1481 for (auto cell_idx = cs.start_; dest_vec_idx < cs_length;
1482 cell_idx += stride, dest_vec_idx++) {
1483 auto offset_offsets = (*offset_offsets_per_cs)[arr_offset + dest_vec_idx];
1484 auto offset_dest = buffer + offset_offsets;
1485 auto var_offset = (*var_offsets_per_cs)[arr_offset + dest_vec_idx];
1486 auto var_dest = buffer_var + var_offset;
1487 auto validity_dest = buffer_validity + (offset_offsets / offset_size);
1488
1489 if (offsets_format_mode_ == "elements") {
1490 var_offset = var_offset / attr_datatype_size;
1491 }
1492
1493 // Copy offset
1494 std::memcpy(offset_dest, &var_offset, offset_size);
1495
1496 // Copy variable-sized value
1497 if (cs.tile_ == nullptr || cs.tile_->tile_tuple(*name) == nullptr) {
1498 std::memcpy(var_dest, fill_value.data(), fill_value_size);
1499 if (nullable)
1500 std::memset(
1501 validity_dest,
1502 fill_value_validity,
1503 constants::cell_validity_size);
1504 } else {
1505 const uint64_t cell_var_size =
1506 (cell_idx != tile_cell_num - 1) ?
1507 tile_offsets[cell_idx + 1] - tile_offsets[cell_idx] :
1508 tile_var->size() - (tile_offsets[cell_idx] - tile_offsets[0]);
1509 const uint64_t tile_var_offset =
1510 tile_offsets[cell_idx] - tile_offsets[0];
1511
1512 RETURN_NOT_OK(tile_var->read(var_dest, cell_var_size, tile_var_offset));
1513
1514 if (nullable)
1515 RETURN_NOT_OK(tile_validity->read(
1516 validity_dest, constants::cell_validity_size, cell_idx));
1517 }
1518 }
1519
1520 arr_offset += cs_length;
1521 }
1522
1523 return Status::Ok();
1524 }
1525
process_tiles(const std::unordered_map<std::string,ProcessTileFlags> * names,std::vector<ResultTile * > * result_tiles,std::vector<ResultCellSlab> * result_cell_slabs,Subarray * subarray,const uint64_t stride,uint64_t memory_budget,uint64_t * memory_used_for_tiles)1526 Status ReaderBase::process_tiles(
1527 const std::unordered_map<std::string, ProcessTileFlags>* names,
1528 std::vector<ResultTile*>* result_tiles,
1529 std::vector<ResultCellSlab>* result_cell_slabs,
1530 Subarray* subarray,
1531 const uint64_t stride,
1532 uint64_t memory_budget,
1533 uint64_t* memory_used_for_tiles) {
1534 // If a name needs to be read, we put it on `read_names` vector (it may
1535 // contain other flags). Otherwise, we put the name on the `copy_names`
1536 // vector if it needs to be copied back to the user buffer.
1537 // We can benefit from concurrent reads by processing `read_names`
1538 // separately from `copy_names`.
1539 std::vector<std::string> read_names;
1540 std::vector<std::string> copy_names;
1541 bool is_apply_query_condition = true;
1542 read_names.reserve(names->size());
1543 for (const auto& name_pair : *names) {
1544 const std::string name = name_pair.first;
1545 const ProcessTileFlags flags = name_pair.second;
1546 if (flags & ProcessTileFlag::READ) {
1547 if (flags & ProcessTileFlag::COPY)
1548 is_apply_query_condition = false;
1549 read_names.push_back(name);
1550 } else if (flags & ProcessTileFlag::COPY) {
1551 copy_names.push_back(name);
1552 is_apply_query_condition = false;
1553 }
1554 }
1555
1556 // Pre-load all attribute offsets into memory for attributes
1557 // to be read.
1558 RETURN_NOT_OK(load_tile_offsets(subarray, &read_names));
1559
1560 // Respect the memory budget if it is set.
1561 if (memory_budget != std::numeric_limits<uint64_t>::max()) {
1562 // For query condition, make sure all tiles fit in memory budget.
1563 if (is_apply_query_condition) {
1564 for (const auto& name_pair : *names) {
1565 const std::string name = name_pair.first;
1566 assert(name_pair.second == ProcessTileFlag::READ);
1567
1568 for (const auto& rt : *result_tiles) {
1569 uint64_t tile_size = 0;
1570 RETURN_NOT_OK(get_attribute_tile_size(
1571 name, rt->frag_idx(), rt->tile_idx(), &tile_size));
1572 *memory_used_for_tiles += tile_size;
1573 }
1574 }
1575
1576 if (*memory_used_for_tiles > memory_budget) {
1577 return Status::ReaderError(
1578 "Exceeded tile memory budget applying query condition");
1579 }
1580 } else {
1581 // Make sure that for each attribute, all tiles can fit in the budget.
1582 uint64_t new_result_tile_size = result_tiles->size();
1583 for (const auto& name_pair : *names) {
1584 uint64_t mem_usage = 0;
1585 for (uint64_t i = 0;
1586 i < result_tiles->size() && i < new_result_tile_size;
1587 i++) {
1588 const auto& rt = result_tiles->at(i);
1589 uint64_t tile_size = 0;
1590 RETURN_NOT_OK(get_attribute_tile_size(
1591 name_pair.first, rt->frag_idx(), rt->tile_idx(), &tile_size));
1592 if (mem_usage + tile_size > memory_budget) {
1593 new_result_tile_size = i;
1594 break;
1595 }
1596 mem_usage += tile_size;
1597 }
1598 }
1599
1600 if (new_result_tile_size != result_tiles->size()) {
1601 // Erase tiles from result tiles.
1602 result_tiles->erase(
1603 result_tiles->begin() + new_result_tile_size - 1,
1604 result_tiles->end());
1605
1606 // Find the result cell slab index to end the copy opeation.
1607 auto& last_tile = result_tiles->back();
1608 uint64_t last_idx = 0;
1609 for (uint64_t i = 0; i < result_cell_slabs->size(); i++) {
1610 if (result_cell_slabs->at(i).tile_ == last_tile) {
1611 if (i == 0) {
1612 return Status::ReaderError(
1613 "Unable to copy one tile with current budget");
1614 }
1615 last_idx = i;
1616 }
1617 }
1618
1619 // Adjust copy_end_
1620 if (copy_end_.first > last_idx + 1) {
1621 copy_end_.first = last_idx + 1;
1622 copy_end_.second = result_cell_slabs->at(last_idx).length_;
1623 }
1624 }
1625 }
1626 }
1627
1628 // Get the maximum number of attributes to read and unfilter in parallel.
1629 // Each attribute requires additional memory to buffer reads into
1630 // before copying them back into `buffers_`. Cells must be copied
1631 // before moving onto the next set of concurrent reads to prevent
1632 // bloating memory. Additionally, the copy cells paths are performed
1633 // in serial, which will bottleneck the read concurrency. Increasing
1634 // this number will have diminishing returns on performance.
1635 const uint64_t concurrent_reads = constants::concurrent_attr_reads;
1636
1637 // Instantiate partitions for copying fixed and variable cells.
1638 std::vector<size_t> fixed_cs_partitions;
1639 compute_fixed_cs_partitions(result_cell_slabs, &fixed_cs_partitions);
1640
1641 std::vector<std::pair<size_t, size_t>> var_cs_partitions;
1642 size_t total_var_cs_length;
1643 compute_var_cs_partitions(
1644 result_cell_slabs, &var_cs_partitions, &total_var_cs_length);
1645
1646 // Handle attribute/dimensions that need to be copied but do
1647 // not need to be read.
1648 for (const auto& copy_name : copy_names) {
1649 if (!array_schema_->var_size(copy_name))
1650 RETURN_CANCEL_OR_ERROR(copy_fixed_cells(
1651 copy_name, stride, result_cell_slabs, &fixed_cs_partitions));
1652 else
1653 RETURN_CANCEL_OR_ERROR(copy_var_cells(
1654 copy_name,
1655 stride,
1656 result_cell_slabs,
1657 &var_cs_partitions,
1658 total_var_cs_length));
1659
1660 // Copy only here should be attributes in the query condition. They should
1661 // be treated as coordinates.
1662 if (clear_coords_tiles_on_copy_)
1663 clear_tiles(copy_name, result_tiles);
1664 }
1665
1666 // Iterate through all of the attribute names. This loop
1667 // will read, unfilter, and copy tiles back into the `buffers_`.
1668 uint64_t idx = 0;
1669 tdb_unique_ptr<ResultCellSlabsIndex> rcs_index = nullptr;
1670 while (idx < read_names.size()) {
1671 // We will perform `concurrent_reads` unless we have a smaller
1672 // number of remaining attributes to process.
1673 const uint64_t num_reads =
1674 std::min(concurrent_reads, read_names.size() - idx);
1675
1676 // Build a vector of the attribute names to process.
1677 std::vector<std::string> inner_names(
1678 read_names.begin() + idx, read_names.begin() + idx + num_reads);
1679
1680 // Read the tiles for the names in `inner_names`. Each attribute
1681 // name will be read concurrently.
1682 RETURN_CANCEL_OR_ERROR(read_attribute_tiles(&inner_names, result_tiles));
1683
1684 // Copy the cells into the associated `buffers_`, and then clear the cells
1685 // from the tiles. The cell copies are not thread safe. Clearing tiles are
1686 // thread safe, but quick enough that they do not justify scheduling on
1687 // separate threads.
1688 for (const auto& inner_name : inner_names) {
1689 const ProcessTileFlags flags = names->at(inner_name);
1690
1691 RETURN_CANCEL_OR_ERROR(unfilter_tiles(inner_name, result_tiles));
1692
1693 if (flags & ProcessTileFlag::COPY) {
1694 if (!array_schema_->var_size(inner_name)) {
1695 RETURN_CANCEL_OR_ERROR(copy_fixed_cells(
1696 inner_name, stride, result_cell_slabs, &fixed_cs_partitions));
1697 } else {
1698 RETURN_CANCEL_OR_ERROR(copy_var_cells(
1699 inner_name,
1700 stride,
1701 result_cell_slabs,
1702 &var_cs_partitions,
1703 total_var_cs_length));
1704 }
1705 clear_tiles(inner_name, result_tiles);
1706 }
1707 }
1708
1709 idx += inner_names.size();
1710 }
1711
1712 return Status::Ok();
1713 }
1714
compute_rcs_index(const std::vector<ResultCellSlab> * result_cell_slabs) const1715 tdb_unique_ptr<ReaderBase::ResultCellSlabsIndex> ReaderBase::compute_rcs_index(
1716 const std::vector<ResultCellSlab>* result_cell_slabs) const {
1717 // Build an association from the result tile to the cell slab ranges
1718 // that it contains.
1719 tdb_unique_ptr<ReaderBase::ResultCellSlabsIndex> rcs_index =
1720 tdb_unique_ptr<ReaderBase::ResultCellSlabsIndex>(
1721 tdb_new(ResultCellSlabsIndex));
1722
1723 std::vector<ResultCellSlab>::const_iterator it = result_cell_slabs->cbegin();
1724 while (it != result_cell_slabs->cend()) {
1725 std::pair<uint64_t, uint64_t> range =
1726 std::make_pair(it->start_, it->start_ + it->length_);
1727 if (rcs_index->find(it->tile_) == rcs_index->end()) {
1728 std::vector<std::pair<uint64_t, uint64_t>> ranges(1, std::move(range));
1729 rcs_index->insert(std::make_pair(it->tile_, std::move(ranges)));
1730 } else {
1731 (*rcs_index)[it->tile_].emplace_back(std::move(range));
1732 }
1733 ++it;
1734 }
1735
1736 return rcs_index;
1737 }
1738
apply_query_condition(std::vector<ResultCellSlab> * const result_cell_slabs,std::vector<ResultTile * > * result_tiles,Subarray * subarray,uint64_t stride,uint64_t memory_budget_rcs,uint64_t memory_budget_tiles,uint64_t * memory_used_for_tiles)1739 Status ReaderBase::apply_query_condition(
1740 std::vector<ResultCellSlab>* const result_cell_slabs,
1741 std::vector<ResultTile*>* result_tiles,
1742 Subarray* subarray,
1743 uint64_t stride,
1744 uint64_t memory_budget_rcs,
1745 uint64_t memory_budget_tiles,
1746 uint64_t* memory_used_for_tiles) {
1747 if (condition_.empty() || result_cell_slabs->empty())
1748 return Status::Ok();
1749
1750 // To evaluate the query condition, we need to read tiles for the
1751 // attributes used in the query condition. Build a map of attribute
1752 // names to read.
1753 const std::unordered_set<std::string>& condition_names =
1754 condition_.field_names();
1755 std::unordered_map<std::string, ProcessTileFlags> names;
1756 for (const auto& condition_name : condition_names) {
1757 names[condition_name] = ProcessTileFlag::READ;
1758 }
1759
1760 // Each element in `names` has been flagged with `ProcessTileFlag::READ`.
1761 // This will read the tiles, but will not copy them into the user buffers.
1762 RETURN_NOT_OK(process_tiles(
1763 &names,
1764 result_tiles,
1765 result_cell_slabs,
1766 subarray,
1767 stride,
1768 memory_budget_tiles,
1769 memory_used_for_tiles));
1770
1771 // The `UINT64_MAX` is a sentinel value to indicate that we do not
1772 // use a stride in the cell index calculation. To simplify our logic,
1773 // assign this to `1`.
1774 if (stride == UINT64_MAX)
1775 stride = 1;
1776
1777 RETURN_NOT_OK(condition_.apply(
1778 array_schema_, result_cell_slabs, stride, memory_budget_rcs));
1779
1780 logger_->debug("Done applying query condition");
1781
1782 return Status::Ok();
1783 }
1784
get_attribute_tile_size(const std::string & name,unsigned f,uint64_t t,uint64_t * tile_size)1785 Status ReaderBase::get_attribute_tile_size(
1786 const std::string& name, unsigned f, uint64_t t, uint64_t* tile_size) {
1787 *tile_size = 0;
1788 *tile_size += fragment_metadata_[f]->tile_size(name, t);
1789
1790 if (array_schema_->var_size(name)) {
1791 uint64_t temp = 0;
1792 RETURN_NOT_OK(fragment_metadata_[f]->tile_var_size(name, t, &temp));
1793 *tile_size += temp;
1794 }
1795
1796 if (array_schema_->is_nullable(name)) {
1797 *tile_size +=
1798 fragment_metadata_[f]->cell_num(t) * constants::cell_validity_size;
1799 }
1800
1801 return Status::Ok();
1802 }
1803
1804 template <class T>
compute_result_space_tiles(const Subarray * subarray,const Subarray * partitioner_subarray,std::map<const T *,ResultSpaceTile<T>> * result_space_tiles) const1805 void ReaderBase::compute_result_space_tiles(
1806 const Subarray* subarray,
1807 const Subarray* partitioner_subarray,
1808 std::map<const T*, ResultSpaceTile<T>>* result_space_tiles) const {
1809 // For easy reference
1810 auto domain = array_schema_->domain()->domain();
1811 auto tile_extents = array_schema_->domain()->tile_extents();
1812 auto tile_order = array_schema_->tile_order();
1813
1814 // Compute fragment tile domains
1815 std::vector<TileDomain<T>> frag_tile_domains;
1816
1817 if (partitioner_subarray->is_set()) {
1818 auto relevant_frags = partitioner_subarray->relevant_fragments();
1819 for (auto it = relevant_frags->rbegin(); it != relevant_frags->rend();
1820 it++) {
1821 if (fragment_metadata_[*it]->dense()) {
1822 frag_tile_domains.emplace_back(
1823 *it,
1824 domain,
1825 fragment_metadata_[*it]->non_empty_domain(),
1826 tile_extents,
1827 tile_order);
1828 }
1829 }
1830 } else {
1831 auto fragment_num = (int)fragment_metadata_.size();
1832 if (fragment_num > 0) {
1833 for (int i = fragment_num - 1; i >= 0; --i) {
1834 if (fragment_metadata_[i]->dense()) {
1835 frag_tile_domains.emplace_back(
1836 i,
1837 domain,
1838 fragment_metadata_[i]->non_empty_domain(),
1839 tile_extents,
1840 tile_order);
1841 }
1842 }
1843 }
1844 }
1845
1846 // Get tile coords and array domain
1847 const auto& tile_coords = subarray->tile_coords();
1848 TileDomain<T> array_tile_domain(
1849 UINT32_MAX, domain, domain, tile_extents, tile_order);
1850
1851 // Compute result space tiles
1852 compute_result_space_tiles<T>(
1853 array_schema_->domain(),
1854 tile_coords,
1855 array_tile_domain,
1856 frag_tile_domains,
1857 result_space_tiles);
1858 }
1859
has_coords() const1860 bool ReaderBase::has_coords() const {
1861 for (const auto& it : buffers_) {
1862 if (it.first == constants::coords || array_schema_->is_dim(it.first))
1863 return true;
1864 }
1865
1866 return false;
1867 }
1868
1869 template <class T>
fill_dense_coords(const Subarray & subarray)1870 Status ReaderBase::fill_dense_coords(const Subarray& subarray) {
1871 auto timer_se = stats_->start_timer("fill_dense_coords");
1872
1873 // Reading coordinates with a query condition is currently unsupported.
1874 // Query conditions mutate the result cell slabs to filter attributes.
1875 // This path does not use result cell slabs, which will fill coordinates
1876 // for cells that should be filtered out.
1877 if (!condition_.empty()) {
1878 return logger_->status(
1879 Status::ReaderError("Cannot read dense coordinates; dense coordinate "
1880 "reads are unsupported with a query condition"));
1881 }
1882
1883 // Prepare buffers
1884 std::vector<unsigned> dim_idx;
1885 std::vector<QueryBuffer*> buffers;
1886 auto coords_it = buffers_.find(constants::coords);
1887 auto dim_num = array_schema_->dim_num();
1888 if (coords_it != buffers_.end()) {
1889 buffers.emplace_back(&(coords_it->second));
1890 dim_idx.emplace_back(dim_num);
1891 } else {
1892 for (unsigned d = 0; d < dim_num; ++d) {
1893 const auto& dim = array_schema_->dimension(d);
1894 auto it = buffers_.find(dim->name());
1895 if (it != buffers_.end()) {
1896 buffers.emplace_back(&(it->second));
1897 dim_idx.emplace_back(d);
1898 }
1899 }
1900 }
1901 std::vector<uint64_t> offsets(buffers.size(), 0);
1902
1903 if (layout_ == Layout::GLOBAL_ORDER) {
1904 RETURN_NOT_OK(
1905 fill_dense_coords_global<T>(subarray, dim_idx, buffers, &offsets));
1906 } else {
1907 assert(layout_ == Layout::ROW_MAJOR || layout_ == Layout::COL_MAJOR);
1908 RETURN_NOT_OK(
1909 fill_dense_coords_row_col<T>(subarray, dim_idx, buffers, &offsets));
1910 }
1911
1912 // Update buffer sizes
1913 for (size_t i = 0; i < buffers.size(); ++i)
1914 *(buffers[i]->buffer_size_) = offsets[i];
1915
1916 return Status::Ok();
1917 }
1918
1919 template <class T>
fill_dense_coords_global(const Subarray & subarray,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets)1920 Status ReaderBase::fill_dense_coords_global(
1921 const Subarray& subarray,
1922 const std::vector<unsigned>& dim_idx,
1923 const std::vector<QueryBuffer*>& buffers,
1924 std::vector<uint64_t>* offsets) {
1925 auto tile_coords = subarray.tile_coords();
1926 auto cell_order = array_schema_->cell_order();
1927
1928 for (const auto& tc : tile_coords) {
1929 auto tile_subarray = subarray.crop_to_tile((const T*)&tc[0], cell_order);
1930 RETURN_NOT_OK(
1931 fill_dense_coords_row_col<T>(tile_subarray, dim_idx, buffers, offsets));
1932 }
1933
1934 return Status::Ok();
1935 }
1936
1937 template <class T>
fill_dense_coords_row_col(const Subarray & subarray,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets)1938 Status ReaderBase::fill_dense_coords_row_col(
1939 const Subarray& subarray,
1940 const std::vector<unsigned>& dim_idx,
1941 const std::vector<QueryBuffer*>& buffers,
1942 std::vector<uint64_t>* offsets) {
1943 auto cell_order = array_schema_->cell_order();
1944 auto dim_num = array_schema_->dim_num();
1945
1946 // Iterate over all coordinates, retrieved in cell slabs
1947 CellSlabIter<T> iter(&subarray);
1948 RETURN_CANCEL_OR_ERROR(iter.begin());
1949 while (!iter.end()) {
1950 auto cell_slab = iter.cell_slab();
1951 auto coords_num = cell_slab.length_;
1952
1953 // Check for overflow
1954 for (size_t i = 0; i < buffers.size(); ++i) {
1955 auto idx = (dim_idx[i] == dim_num) ? 0 : dim_idx[i];
1956 auto dim = array_schema_->domain()->dimension(idx);
1957 auto coord_size = dim->coord_size();
1958 coord_size = (dim_idx[i] == dim_num) ? coord_size * dim_num : coord_size;
1959 auto buff_size = *(buffers[i]->buffer_size_);
1960 auto offset = (*offsets)[i];
1961 if (coords_num * coord_size + offset > buff_size) {
1962 copy_overflowed_ = true;
1963 return Status::Ok();
1964 }
1965 }
1966
1967 // Copy slab
1968 if (layout_ == Layout::ROW_MAJOR ||
1969 (layout_ == Layout::GLOBAL_ORDER && cell_order == Layout::ROW_MAJOR))
1970 fill_dense_coords_row_slab(
1971 &cell_slab.coords_[0], coords_num, dim_idx, buffers, offsets);
1972 else
1973 fill_dense_coords_col_slab(
1974 &cell_slab.coords_[0], coords_num, dim_idx, buffers, offsets);
1975
1976 ++iter;
1977 }
1978
1979 return Status::Ok();
1980 }
1981
1982 template <class T>
fill_dense_coords_row_slab(const T * start,uint64_t num,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets) const1983 void ReaderBase::fill_dense_coords_row_slab(
1984 const T* start,
1985 uint64_t num,
1986 const std::vector<unsigned>& dim_idx,
1987 const std::vector<QueryBuffer*>& buffers,
1988 std::vector<uint64_t>* offsets) const {
1989 // For easy reference
1990 auto dim_num = array_schema_->dim_num();
1991
1992 // Special zipped coordinates
1993 if (dim_idx.size() == 1 && dim_idx[0] == dim_num) {
1994 auto c_buff = (char*)buffers[0]->buffer_;
1995 auto offset = &(*offsets)[0];
1996
1997 // Fill coordinates
1998 for (uint64_t i = 0; i < num; ++i) {
1999 // First dim-1 dimensions are copied as they are
2000 if (dim_num > 1) {
2001 auto bytes_to_copy = (dim_num - 1) * sizeof(T);
2002 std::memcpy(c_buff + *offset, start, bytes_to_copy);
2003 *offset += bytes_to_copy;
2004 }
2005
2006 // Last dimension is incremented by `i`
2007 auto new_coord = start[dim_num - 1] + i;
2008 std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2009 *offset += sizeof(T);
2010 }
2011 } else { // Set of separate coordinate buffers
2012 for (uint64_t i = 0; i < num; ++i) {
2013 for (size_t b = 0; b < buffers.size(); ++b) {
2014 auto c_buff = (char*)buffers[b]->buffer_;
2015 auto offset = &(*offsets)[b];
2016
2017 // First dim-1 dimensions are copied as they are
2018 if (dim_num > 1 && dim_idx[b] < dim_num - 1) {
2019 std::memcpy(c_buff + *offset, &start[dim_idx[b]], sizeof(T));
2020 *offset += sizeof(T);
2021 } else {
2022 // Last dimension is incremented by `i`
2023 auto new_coord = start[dim_num - 1] + i;
2024 std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2025 *offset += sizeof(T);
2026 }
2027 }
2028 }
2029 }
2030 }
2031
2032 template <class T>
fill_dense_coords_col_slab(const T * start,uint64_t num,const std::vector<unsigned> & dim_idx,const std::vector<QueryBuffer * > & buffers,std::vector<uint64_t> * offsets) const2033 void ReaderBase::fill_dense_coords_col_slab(
2034 const T* start,
2035 uint64_t num,
2036 const std::vector<unsigned>& dim_idx,
2037 const std::vector<QueryBuffer*>& buffers,
2038 std::vector<uint64_t>* offsets) const {
2039 // For easy reference
2040 auto dim_num = array_schema_->dim_num();
2041
2042 // Special zipped coordinates
2043 if (dim_idx.size() == 1 && dim_idx[0] == dim_num) {
2044 auto c_buff = (char*)buffers[0]->buffer_;
2045 auto offset = &(*offsets)[0];
2046
2047 // Fill coordinates
2048 for (uint64_t i = 0; i < num; ++i) {
2049 // First dimension is incremented by `i`
2050 auto new_coord = start[0] + i;
2051 std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2052 *offset += sizeof(T);
2053
2054 // Last dim-1 dimensions are copied as they are
2055 if (dim_num > 1) {
2056 auto bytes_to_copy = (dim_num - 1) * sizeof(T);
2057 std::memcpy(c_buff + *offset, &start[1], bytes_to_copy);
2058 *offset += bytes_to_copy;
2059 }
2060 }
2061 } else { // Separate coordinate buffers
2062 for (uint64_t i = 0; i < num; ++i) {
2063 for (size_t b = 0; b < buffers.size(); ++b) {
2064 auto c_buff = (char*)buffers[b]->buffer_;
2065 auto offset = &(*offsets)[b];
2066
2067 // First dimension is incremented by `i`
2068 if (dim_idx[b] == 0) {
2069 auto new_coord = start[0] + i;
2070 std::memcpy(c_buff + *offset, &new_coord, sizeof(T));
2071 *offset += sizeof(T);
2072 } else { // Last dim-1 dimensions are copied as they are
2073 std::memcpy(c_buff + *offset, &start[dim_idx[b]], sizeof(T));
2074 *offset += sizeof(T);
2075 }
2076 }
2077 }
2078 }
2079 }
2080
2081 // Explicit template instantiations
2082 template void ReaderBase::compute_result_space_tiles<int8_t>(
2083 const Subarray* subarray,
2084 const Subarray* partitioner_subarray,
2085 std::map<const int8_t*, ResultSpaceTile<int8_t>>* result_space_tiles) const;
2086 template void ReaderBase::compute_result_space_tiles<uint8_t>(
2087 const Subarray* subarray,
2088 const Subarray* partitioner_subarray,
2089 std::map<const uint8_t*, ResultSpaceTile<uint8_t>>* result_space_tiles)
2090 const;
2091 template void ReaderBase::compute_result_space_tiles<int16_t>(
2092 const Subarray* subarray,
2093 const Subarray* partitioner_subarray,
2094 std::map<const int16_t*, ResultSpaceTile<int16_t>>* result_space_tiles)
2095 const;
2096 template void ReaderBase::compute_result_space_tiles<uint16_t>(
2097 const Subarray* subarray,
2098 const Subarray* partitioner_subarray,
2099 std::map<const uint16_t*, ResultSpaceTile<uint16_t>>* result_space_tiles)
2100 const;
2101 template void ReaderBase::compute_result_space_tiles<int32_t>(
2102 const Subarray* subarray,
2103 const Subarray* partitioner_subarray,
2104 std::map<const int32_t*, ResultSpaceTile<int32_t>>* result_space_tiles)
2105 const;
2106 template void ReaderBase::compute_result_space_tiles<uint32_t>(
2107 const Subarray* subarray,
2108 const Subarray* partitioner_subarray,
2109 std::map<const uint32_t*, ResultSpaceTile<uint32_t>>* result_space_tiles)
2110 const;
2111 template void ReaderBase::compute_result_space_tiles<int64_t>(
2112 const Subarray* subarray,
2113 const Subarray* partitioner_subarray,
2114 std::map<const int64_t*, ResultSpaceTile<int64_t>>* result_space_tiles)
2115 const;
2116 template void ReaderBase::compute_result_space_tiles<uint64_t>(
2117 const Subarray* subarray,
2118 const Subarray* partitioner_subarray,
2119 std::map<const uint64_t*, ResultSpaceTile<uint64_t>>* result_space_tiles)
2120 const;
2121
2122 template Status ReaderBase::fill_dense_coords<int8_t>(const Subarray&);
2123 template Status ReaderBase::fill_dense_coords<uint8_t>(const Subarray&);
2124 template Status ReaderBase::fill_dense_coords<int16_t>(const Subarray&);
2125 template Status ReaderBase::fill_dense_coords<uint16_t>(const Subarray&);
2126 template Status ReaderBase::fill_dense_coords<int32_t>(const Subarray&);
2127 template Status ReaderBase::fill_dense_coords<uint32_t>(const Subarray&);
2128 template Status ReaderBase::fill_dense_coords<int64_t>(const Subarray&);
2129 template Status ReaderBase::fill_dense_coords<uint64_t>(const Subarray&);
2130
2131 } // namespace sm
2132 } // namespace tiledb
2133