1 /**
2 * @file sparse_index_reader_base.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file implements class SparseIndexReaderBase.
31 */
32
33 #include "tiledb/sm/query/sparse_index_reader_base.h"
34 #include "tiledb/common/logger.h"
35 #include "tiledb/sm/array/array.h"
36 #include "tiledb/sm/array_schema/array_schema.h"
37 #include "tiledb/sm/filesystem/vfs.h"
38 #include "tiledb/sm/fragment/fragment_metadata.h"
39 #include "tiledb/sm/misc/parallel_functions.h"
40 #include "tiledb/sm/query/query_macros.h"
41 #include "tiledb/sm/query/strategy_base.h"
42 #include "tiledb/sm/storage_manager/open_array_memory_tracker.h"
43 #include "tiledb/sm/subarray/subarray.h"
44
45 namespace tiledb {
46 namespace sm {
47
48 /* ****************************** */
49 /* CONSTRUCTORS */
50 /* ****************************** */
51
SparseIndexReaderBase(stats::Stats * stats,tdb_shared_ptr<Logger> logger,StorageManager * storage_manager,Array * array,Config & config,std::unordered_map<std::string,QueryBuffer> & buffers,Subarray & subarray,Layout layout,QueryCondition & condition)52 SparseIndexReaderBase::SparseIndexReaderBase(
53 stats::Stats* stats,
54 tdb_shared_ptr<Logger> logger,
55 StorageManager* storage_manager,
56 Array* array,
57 Config& config,
58 std::unordered_map<std::string, QueryBuffer>& buffers,
59 Subarray& subarray,
60 Layout layout,
61 QueryCondition& condition)
62 : ReaderBase(
63 stats,
64 logger,
65 storage_manager,
66 array,
67 config,
68 buffers,
69 subarray,
70 layout,
71 condition)
72 , initial_data_loaded_(false)
73 , memory_budget_(0)
74 , array_memory_tracker_(nullptr)
75 , memory_used_for_coords_total_(0)
76 , memory_used_qc_tiles_(0)
77 , memory_used_rcs_(0)
78 , memory_used_result_tiles_(0)
79 , memory_used_result_tile_ranges_(0)
80 , memory_budget_ratio_coords_(0.5)
81 , memory_budget_ratio_query_condition_(0.25)
82 , memory_budget_ratio_tile_ranges_(0.1)
83 , memory_budget_ratio_array_data_(0.1)
84 , memory_budget_ratio_result_tiles_(0.05)
85 , memory_budget_ratio_rcs_(0.05)
86 , coords_loaded_(true) {
87 read_state_.done_adding_result_tiles_ = false;
88 }
89
90 /* ****************************** */
91 /* PROTECTED METHODS */
92 /* ****************************** */
93
read_state() const94 const SparseIndexReaderBase::ReadState* SparseIndexReaderBase::read_state()
95 const {
96 return &read_state_;
97 }
98
read_state()99 SparseIndexReaderBase::ReadState* SparseIndexReaderBase::read_state() {
100 return &read_state_;
101 }
102
get_coord_tiles_size(unsigned dim_num,unsigned f,uint64_t t,uint64_t * tiles_size)103 Status SparseIndexReaderBase::get_coord_tiles_size(
104 unsigned dim_num, unsigned f, uint64_t t, uint64_t* tiles_size) {
105 *tiles_size = 0;
106 for (unsigned d = 0; d < dim_num; d++) {
107 *tiles_size += fragment_metadata_[f]->tile_size(dim_names_[d], t);
108
109 if (is_dim_var_size_[d]) {
110 uint64_t temp = 0;
111 RETURN_NOT_OK(
112 fragment_metadata_[f]->tile_var_size(dim_names_[d], t, &temp));
113 *tiles_size += temp;
114 }
115 }
116
117 return Status::Ok();
118 }
119
load_initial_data()120 Status SparseIndexReaderBase::load_initial_data() {
121 if (initial_data_loaded_)
122 return Status::Ok();
123
124 auto timer_se = stats_->start_timer("load_initial_data");
125 read_state_.done_adding_result_tiles_ = false;
126
127 // For easy reference.
128 auto fragment_num = fragment_metadata_.size();
129
130 // Make sure there is enough space for tiles data.
131 read_state_.frag_tile_idx_.clear();
132 all_tiles_loaded_.clear();
133 read_state_.frag_tile_idx_.resize(fragment_num);
134 all_tiles_loaded_.resize(fragment_num);
135
136 // Calculate ranges of tiles in the subarray, if set.
137 if (subarray_.is_set()) {
138 // At this point, full memory budget is available.
139 array_memory_tracker_->set_budget(memory_budget_);
140
141 // Make sure there is no memory taken by the subarray.
142 subarray_.clear_tile_overlap();
143
144 // Tile ranges computation will not stop if it exceeds memory budget.
145 // This is ok as it is a soft limit and will be taken into consideration
146 // later.
147 RETURN_NOT_OK(subarray_.precompute_all_ranges_tile_overlap(
148 storage_manager_->compute_tp(), &result_tile_ranges_));
149
150 for (auto frag_result_tile_ranges : result_tile_ranges_) {
151 memory_used_result_tile_ranges_ += frag_result_tile_ranges.size() *
152 sizeof(std::pair<uint64_t, uint64_t>);
153 }
154
155 if (memory_used_result_tile_ranges_ >
156 memory_budget_ratio_tile_ranges_ * memory_budget_)
157 return logger_->status(
158 Status::ReaderError("Exceeded memory budget for result tile ranges"));
159 }
160
161 // Set a limit to the array memory.
162 array_memory_tracker_->set_budget(
163 memory_budget_ * memory_budget_ratio_array_data_);
164
165 // Preload zipped coordinate tile offsets. Note that this will
166 // ignore fragments with a version >= 5.
167 std::vector<std::string> zipped_coords_names = {constants::coords};
168 RETURN_CANCEL_OR_ERROR(load_tile_offsets(&subarray_, &zipped_coords_names));
169
170 // Preload unzipped coordinate tile offsets. Note that this will
171 // ignore fragments with a version < 5.
172 const auto dim_num = array_schema_->dim_num();
173 dim_names_.reserve(dim_num);
174 is_dim_var_size_.reserve(dim_num);
175 std::vector<std::string> var_size_to_load;
176 for (unsigned d = 0; d < dim_num; ++d) {
177 dim_names_.emplace_back(array_schema_->dimension(d)->name());
178 is_dim_var_size_[d] = array_schema_->var_size(dim_names_[d]);
179 if (is_dim_var_size_[d])
180 var_size_to_load.emplace_back(dim_names_[d]);
181 }
182 RETURN_CANCEL_OR_ERROR(load_tile_offsets(&subarray_, &dim_names_));
183
184 for (auto& it : buffers_) {
185 const auto& name = it.first;
186 if (array_schema_->is_dim(name))
187 continue;
188
189 if (array_schema_->var_size(name))
190 var_size_to_load.emplace_back(name);
191 }
192 RETURN_CANCEL_OR_ERROR(load_tile_var_sizes(&subarray_, &var_size_to_load));
193
194 logger_->debug("Initial data loaded");
195 initial_data_loaded_ = true;
196 return Status::Ok();
197 }
198
199 // Sort vector elements by second element of tuples.
reverse_tuple_sort_by_second(const tuple<uint64_t,uint64_t,uint64_t> & a,const tuple<uint64_t,uint64_t,uint64_t> & b)200 bool reverse_tuple_sort_by_second(
201 const tuple<uint64_t, uint64_t, uint64_t>& a,
202 const tuple<uint64_t, uint64_t, uint64_t>& b) {
203 return (std::get<1>(a) > std::get<1>(b));
204 }
205
compute_coord_tiles_result_bitmap(ResultTile * tile,uint64_t range_idx,std::vector<uint8_t> * coord_tiles_result_bitmap)206 Status SparseIndexReaderBase::compute_coord_tiles_result_bitmap(
207 ResultTile* tile,
208 uint64_t range_idx,
209 std::vector<uint8_t>* coord_tiles_result_bitmap) {
210 auto timer_se = stats_->start_timer("compute_coord_tiles_result_bitmap");
211
212 // For easy reference.
213 auto dim_num = array_schema_->dim_num();
214 auto cell_order = array_schema_->cell_order();
215 auto range_coords = subarray_.get_range_coords(range_idx);
216
217 // Compute result and overwritten bitmap per dimension
218 for (unsigned d = 0; d < dim_num; ++d) {
219 // For col-major cell ordering, iterate the dimensions
220 // in reverse.
221 const unsigned dim_idx =
222 cell_order == Layout::COL_MAJOR ? dim_num - d - 1 : d;
223 if (!subarray_.is_default(dim_idx)) {
224 const auto& ranges = subarray_.ranges_for_dim(dim_idx);
225 RETURN_NOT_OK(tile->compute_results_sparse(
226 dim_idx,
227 ranges[range_coords[dim_idx]],
228 coord_tiles_result_bitmap,
229 cell_order));
230 }
231 }
232
233 return Status::Ok();
234 }
235
resize_output_buffers()236 Status SparseIndexReaderBase::resize_output_buffers() {
237 // Count number of elements actually copied.
238 uint64_t cells_copied = 0;
239 for (uint64_t i = 0; i < copy_end_.first - 1; i++) {
240 cells_copied += read_state_.result_cell_slabs_[i].length_;
241 }
242
243 cells_copied += copy_end_.second;
244
245 // Resize buffers if the result cell slabs was truncated.
246 for (auto& it : buffers_) {
247 const auto& name = it.first;
248 const auto size = *it.second.buffer_size_;
249 uint64_t num_cells = 0;
250
251 if (array_schema_->var_size(name)) {
252 // Get the current number of cells from the offsets buffer.
253 num_cells = size / constants::cell_var_offset_size;
254
255 // Remove an element if the extra element flag is set.
256 if (offsets_extra_element_ && num_cells > 0)
257 num_cells--;
258
259 // Buffer should be resized.
260 if (num_cells > cells_copied) {
261 // Offsets buffer is trivial.
262 *(it.second.buffer_size_) =
263 cells_copied * constants::cell_var_offset_size +
264 offsets_extra_element_;
265
266 // Since the buffer is shrunk, there is an offset for the next element
267 // loaded, use it.
268 *(it.second.buffer_var_size_) =
269 ((uint64_t*)it.second.buffer_)[cells_copied];
270 }
271 } else {
272 // Always adjust the size for fixed size attributes.
273 auto cell_size = array_schema_->cell_size(name);
274 *(it.second.buffer_size_) = cells_copied * cell_size;
275 }
276
277 // Always adjust validity vector size, if present.
278 if (num_cells > cells_copied) {
279 if (it.second.validity_vector_.buffer_size() != nullptr)
280 *(it.second.validity_vector_.buffer_size()) =
281 num_cells * constants::cell_validity_size;
282 }
283 }
284
285 return Status::Ok();
286 }
287
add_extra_offset()288 Status SparseIndexReaderBase::add_extra_offset() {
289 for (const auto& it : buffers_) {
290 const auto& name = it.first;
291 if (!array_schema_->var_size(name))
292 continue;
293
294 auto buffer = static_cast<unsigned char*>(it.second.buffer_);
295 if (offsets_format_mode_ == "bytes") {
296 memcpy(
297 buffer + *it.second.buffer_size_ - offsets_bytesize(),
298 it.second.buffer_var_size_,
299 offsets_bytesize());
300 } else if (offsets_format_mode_ == "elements") {
301 auto elements = *it.second.buffer_var_size_ /
302 datatype_size(array_schema_->type(name));
303 memcpy(
304 buffer + *it.second.buffer_size_ - offsets_bytesize(),
305 &elements,
306 offsets_bytesize());
307 } else {
308 return logger_->status(Status::ReaderError(
309 "Cannot add extra offset to buffer; Unsupported offsets format"));
310 }
311 }
312
313 return Status::Ok();
314 }
315
remove_result_tile_range(uint64_t f)316 void SparseIndexReaderBase::remove_result_tile_range(uint64_t f) {
317 result_tile_ranges_[f].pop_back();
318 {
319 std::unique_lock<std::mutex> lck(mem_budget_mtx_);
320 memory_used_result_tile_ranges_ -= sizeof(std::pair<uint64_t, uint64_t>);
321 }
322 }
323
324 } // namespace sm
325 } // namespace tiledb
326