1 /**
2  * @file   sparse_index_reader_base.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file implements class SparseIndexReaderBase.
31  */
32 
33 #include "tiledb/sm/query/sparse_index_reader_base.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array/array.h"
36 #include "tiledb/sm/array_schema/array_schema.h"
37 #include "tiledb/sm/filesystem/vfs.h"
38 #include "tiledb/sm/fragment/fragment_metadata.h"
39 #include "tiledb/sm/misc/parallel_functions.h"
40 #include "tiledb/sm/query/query_macros.h"
41 #include "tiledb/sm/query/strategy_base.h"
42 #include "tiledb/sm/storage_manager/open_array_memory_tracker.h"
43 #include "tiledb/sm/subarray/subarray.h"
44 
45 namespace tiledb {
46 namespace sm {
47 
48 /* ****************************** */
49 /*          CONSTRUCTORS          */
50 /* ****************************** */
51 
SparseIndexReaderBase(stats::Stats * stats,tdb_shared_ptr<Logger> logger,StorageManager * storage_manager,Array * array,Config & config,std::unordered_map<std::string,QueryBuffer> & buffers,Subarray & subarray,Layout layout,QueryCondition & condition)52 SparseIndexReaderBase::SparseIndexReaderBase(
53     stats::Stats* stats,
54     tdb_shared_ptr<Logger> logger,
55     StorageManager* storage_manager,
56     Array* array,
57     Config& config,
58     std::unordered_map<std::string, QueryBuffer>& buffers,
59     Subarray& subarray,
60     Layout layout,
61     QueryCondition& condition)
62     : ReaderBase(
63           stats,
64           logger,
65           storage_manager,
66           array,
67           config,
68           buffers,
69           subarray,
70           layout,
71           condition)
72     , initial_data_loaded_(false)
73     , memory_budget_(0)
74     , array_memory_tracker_(nullptr)
75     , memory_used_for_coords_total_(0)
76     , memory_used_qc_tiles_(0)
77     , memory_used_rcs_(0)
78     , memory_used_result_tiles_(0)
79     , memory_used_result_tile_ranges_(0)
80     , memory_budget_ratio_coords_(0.5)
81     , memory_budget_ratio_query_condition_(0.25)
82     , memory_budget_ratio_tile_ranges_(0.1)
83     , memory_budget_ratio_array_data_(0.1)
84     , memory_budget_ratio_result_tiles_(0.05)
85     , memory_budget_ratio_rcs_(0.05)
86     , coords_loaded_(true) {
87   read_state_.done_adding_result_tiles_ = false;
88 }
89 
90 /* ****************************** */
91 /*        PROTECTED METHODS       */
92 /* ****************************** */
93 
read_state() const94 const SparseIndexReaderBase::ReadState* SparseIndexReaderBase::read_state()
95     const {
96   return &read_state_;
97 }
98 
read_state()99 SparseIndexReaderBase::ReadState* SparseIndexReaderBase::read_state() {
100   return &read_state_;
101 }
102 
get_coord_tiles_size(unsigned dim_num,unsigned f,uint64_t t,uint64_t * tiles_size)103 Status SparseIndexReaderBase::get_coord_tiles_size(
104     unsigned dim_num, unsigned f, uint64_t t, uint64_t* tiles_size) {
105   *tiles_size = 0;
106   for (unsigned d = 0; d < dim_num; d++) {
107     *tiles_size += fragment_metadata_[f]->tile_size(dim_names_[d], t);
108 
109     if (is_dim_var_size_[d]) {
110       uint64_t temp = 0;
111       RETURN_NOT_OK(
112           fragment_metadata_[f]->tile_var_size(dim_names_[d], t, &temp));
113       *tiles_size += temp;
114     }
115   }
116 
117   return Status::Ok();
118 }
119 
load_initial_data()120 Status SparseIndexReaderBase::load_initial_data() {
121   if (initial_data_loaded_)
122     return Status::Ok();
123 
124   auto timer_se = stats_->start_timer("load_initial_data");
125   read_state_.done_adding_result_tiles_ = false;
126 
127   // For easy reference.
128   auto fragment_num = fragment_metadata_.size();
129 
130   // Make sure there is enough space for tiles data.
131   read_state_.frag_tile_idx_.clear();
132   all_tiles_loaded_.clear();
133   read_state_.frag_tile_idx_.resize(fragment_num);
134   all_tiles_loaded_.resize(fragment_num);
135 
136   // Calculate ranges of tiles in the subarray, if set.
137   if (subarray_.is_set()) {
138     // At this point, full memory budget is available.
139     array_memory_tracker_->set_budget(memory_budget_);
140 
141     // Make sure there is no memory taken by the subarray.
142     subarray_.clear_tile_overlap();
143 
144     // Tile ranges computation will not stop if it exceeds memory budget.
145     // This is ok as it is a soft limit and will be taken into consideration
146     // later.
147     RETURN_NOT_OK(subarray_.precompute_all_ranges_tile_overlap(
148         storage_manager_->compute_tp(), &result_tile_ranges_));
149 
150     for (auto frag_result_tile_ranges : result_tile_ranges_) {
151       memory_used_result_tile_ranges_ += frag_result_tile_ranges.size() *
152                                          sizeof(std::pair<uint64_t, uint64_t>);
153     }
154 
155     if (memory_used_result_tile_ranges_ >
156         memory_budget_ratio_tile_ranges_ * memory_budget_)
157       return logger_->status(
158           Status::ReaderError("Exceeded memory budget for result tile ranges"));
159   }
160 
161   // Set a limit to the array memory.
162   array_memory_tracker_->set_budget(
163       memory_budget_ * memory_budget_ratio_array_data_);
164 
165   // Preload zipped coordinate tile offsets. Note that this will
166   // ignore fragments with a version >= 5.
167   std::vector<std::string> zipped_coords_names = {constants::coords};
168   RETURN_CANCEL_OR_ERROR(load_tile_offsets(&subarray_, &zipped_coords_names));
169 
170   // Preload unzipped coordinate tile offsets. Note that this will
171   // ignore fragments with a version < 5.
172   const auto dim_num = array_schema_->dim_num();
173   dim_names_.reserve(dim_num);
174   is_dim_var_size_.reserve(dim_num);
175   std::vector<std::string> var_size_to_load;
176   for (unsigned d = 0; d < dim_num; ++d) {
177     dim_names_.emplace_back(array_schema_->dimension(d)->name());
178     is_dim_var_size_[d] = array_schema_->var_size(dim_names_[d]);
179     if (is_dim_var_size_[d])
180       var_size_to_load.emplace_back(dim_names_[d]);
181   }
182   RETURN_CANCEL_OR_ERROR(load_tile_offsets(&subarray_, &dim_names_));
183 
184   for (auto& it : buffers_) {
185     const auto& name = it.first;
186     if (array_schema_->is_dim(name))
187       continue;
188 
189     if (array_schema_->var_size(name))
190       var_size_to_load.emplace_back(name);
191   }
192   RETURN_CANCEL_OR_ERROR(load_tile_var_sizes(&subarray_, &var_size_to_load));
193 
194   logger_->debug("Initial data loaded");
195   initial_data_loaded_ = true;
196   return Status::Ok();
197 }
198 
199 // Sort vector elements by second element of tuples.
reverse_tuple_sort_by_second(const tuple<uint64_t,uint64_t,uint64_t> & a,const tuple<uint64_t,uint64_t,uint64_t> & b)200 bool reverse_tuple_sort_by_second(
201     const tuple<uint64_t, uint64_t, uint64_t>& a,
202     const tuple<uint64_t, uint64_t, uint64_t>& b) {
203   return (std::get<1>(a) > std::get<1>(b));
204 }
205 
compute_coord_tiles_result_bitmap(ResultTile * tile,uint64_t range_idx,std::vector<uint8_t> * coord_tiles_result_bitmap)206 Status SparseIndexReaderBase::compute_coord_tiles_result_bitmap(
207     ResultTile* tile,
208     uint64_t range_idx,
209     std::vector<uint8_t>* coord_tiles_result_bitmap) {
210   auto timer_se = stats_->start_timer("compute_coord_tiles_result_bitmap");
211 
212   // For easy reference.
213   auto dim_num = array_schema_->dim_num();
214   auto cell_order = array_schema_->cell_order();
215   auto range_coords = subarray_.get_range_coords(range_idx);
216 
217   // Compute result and overwritten bitmap per dimension
218   for (unsigned d = 0; d < dim_num; ++d) {
219     // For col-major cell ordering, iterate the dimensions
220     // in reverse.
221     const unsigned dim_idx =
222         cell_order == Layout::COL_MAJOR ? dim_num - d - 1 : d;
223     if (!subarray_.is_default(dim_idx)) {
224       const auto& ranges = subarray_.ranges_for_dim(dim_idx);
225       RETURN_NOT_OK(tile->compute_results_sparse(
226           dim_idx,
227           ranges[range_coords[dim_idx]],
228           coord_tiles_result_bitmap,
229           cell_order));
230     }
231   }
232 
233   return Status::Ok();
234 }
235 
resize_output_buffers()236 Status SparseIndexReaderBase::resize_output_buffers() {
237   // Count number of elements actually copied.
238   uint64_t cells_copied = 0;
239   for (uint64_t i = 0; i < copy_end_.first - 1; i++) {
240     cells_copied += read_state_.result_cell_slabs_[i].length_;
241   }
242 
243   cells_copied += copy_end_.second;
244 
245   // Resize buffers if the result cell slabs was truncated.
246   for (auto& it : buffers_) {
247     const auto& name = it.first;
248     const auto size = *it.second.buffer_size_;
249     uint64_t num_cells = 0;
250 
251     if (array_schema_->var_size(name)) {
252       // Get the current number of cells from the offsets buffer.
253       num_cells = size / constants::cell_var_offset_size;
254 
255       // Remove an element if the extra element flag is set.
256       if (offsets_extra_element_ && num_cells > 0)
257         num_cells--;
258 
259       // Buffer should be resized.
260       if (num_cells > cells_copied) {
261         // Offsets buffer is trivial.
262         *(it.second.buffer_size_) =
263             cells_copied * constants::cell_var_offset_size +
264             offsets_extra_element_;
265 
266         // Since the buffer is shrunk, there is an offset for the next element
267         // loaded, use it.
268         *(it.second.buffer_var_size_) =
269             ((uint64_t*)it.second.buffer_)[cells_copied];
270       }
271     } else {
272       // Always adjust the size for fixed size attributes.
273       auto cell_size = array_schema_->cell_size(name);
274       *(it.second.buffer_size_) = cells_copied * cell_size;
275     }
276 
277     // Always adjust validity vector size, if present.
278     if (num_cells > cells_copied) {
279       if (it.second.validity_vector_.buffer_size() != nullptr)
280         *(it.second.validity_vector_.buffer_size()) =
281             num_cells * constants::cell_validity_size;
282     }
283   }
284 
285   return Status::Ok();
286 }
287 
add_extra_offset()288 Status SparseIndexReaderBase::add_extra_offset() {
289   for (const auto& it : buffers_) {
290     const auto& name = it.first;
291     if (!array_schema_->var_size(name))
292       continue;
293 
294     auto buffer = static_cast<unsigned char*>(it.second.buffer_);
295     if (offsets_format_mode_ == "bytes") {
296       memcpy(
297           buffer + *it.second.buffer_size_ - offsets_bytesize(),
298           it.second.buffer_var_size_,
299           offsets_bytesize());
300     } else if (offsets_format_mode_ == "elements") {
301       auto elements = *it.second.buffer_var_size_ /
302                       datatype_size(array_schema_->type(name));
303       memcpy(
304           buffer + *it.second.buffer_size_ - offsets_bytesize(),
305           &elements,
306           offsets_bytesize());
307     } else {
308       return logger_->status(Status::ReaderError(
309           "Cannot add extra offset to buffer; Unsupported offsets format"));
310     }
311   }
312 
313   return Status::Ok();
314 }
315 
remove_result_tile_range(uint64_t f)316 void SparseIndexReaderBase::remove_result_tile_range(uint64_t f) {
317   result_tile_ranges_[f].pop_back();
318   {
319     std::unique_lock<std::mutex> lck(mem_budget_mtx_);
320     memory_used_result_tile_ranges_ -= sizeof(std::pair<uint64_t, uint64_t>);
321   }
322 }
323 
324 }  // namespace sm
325 }  // namespace tiledb
326