1 /**
2 * @file query.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 * @copyright Copyright (c) 2016 MIT and Intel Corporation
10 *
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
17 *
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
20 *
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 * THE SOFTWARE.
28 *
29 * @section DESCRIPTION
30 *
31 * This file defines serialization for the Query class
32 */
33
34 // clang-format off
35 #ifdef TILEDB_SERIALIZATION
36 #include <capnp/compat/json.h>
37 #include <capnp/message.h>
38 #include <capnp/serialize.h>
39 #include "tiledb/sm/serialization/capnp_utils.h"
40 #endif
41 // clang-format on
42
43 #include "tiledb/common/heap_memory.h"
44 #include "tiledb/common/logger.h"
45 #include "tiledb/sm/array/array.h"
46 #include "tiledb/sm/buffer/buffer_list.h"
47 #include "tiledb/sm/config/config.h"
48 #include "tiledb/sm/enums/layout.h"
49 #include "tiledb/sm/enums/query_condition_combination_op.h"
50 #include "tiledb/sm/enums/query_status.h"
51 #include "tiledb/sm/enums/query_type.h"
52 #include "tiledb/sm/enums/serialization_type.h"
53 #include "tiledb/sm/fragment/fragment_metadata.h"
54 #include "tiledb/sm/misc/utils.h"
55 #include "tiledb/sm/query/query.h"
56 #include "tiledb/sm/query/reader.h"
57 #include "tiledb/sm/query/dense_reader.h"
58 #include "tiledb/sm/query/sparse_global_order_reader.h"
59 #include "tiledb/sm/query/sparse_unordered_with_dups_reader.h"
60 #include "tiledb/sm/query/writer.h"
61 #include "tiledb/sm/serialization/config.h"
62 #include "tiledb/sm/serialization/query.h"
63 #include "tiledb/sm/subarray/subarray.h"
64 #include "tiledb/sm/subarray/subarray_partitioner.h"
65
66 using namespace tiledb::common;
67 using namespace tiledb::sm::stats;
68
69 namespace tiledb {
70 namespace sm {
71 namespace serialization {
72
73 #ifdef TILEDB_SERIALIZATION
74
75 enum class SerializationContext { CLIENT, SERVER, BACKUP };
76
77 tdb_shared_ptr<Logger> dummy_logger = tdb_make_shared(Logger, "");
78
stats_to_capnp(Stats & stats,capnp::Stats::Builder * stats_builder)79 Status stats_to_capnp(Stats& stats, capnp::Stats::Builder* stats_builder) {
80 // Build counters
81 const auto counters = stats.counters();
82 if (counters != nullptr && !counters->empty()) {
83 auto counters_builder = stats_builder->initCounters();
84 auto entries_builder = counters_builder.initEntries(counters->size());
85 uint64_t index = 0;
86 for (const auto& entry : *counters) {
87 entries_builder[index].setKey(entry.first);
88 entries_builder[index].setValue(entry.second);
89 ++index;
90 }
91 }
92
93 // Build timers
94 const auto timers = stats.timers();
95 if (timers != nullptr && !timers->empty()) {
96 auto timers_builder = stats_builder->initTimers();
97 auto entries_builder = timers_builder.initEntries(timers->size());
98 uint64_t index = 0;
99 for (const auto& entry : *timers) {
100 entries_builder[index].setKey(entry.first);
101 entries_builder[index].setValue(entry.second);
102 ++index;
103 }
104 }
105
106 return Status::Ok();
107 }
108
stats_from_capnp(const capnp::Stats::Reader & stats_reader,Stats * stats)109 Status stats_from_capnp(
110 const capnp::Stats::Reader& stats_reader, Stats* stats) {
111 if (stats_reader.hasCounters()) {
112 auto counters = stats->counters();
113 auto counters_reader = stats_reader.getCounters();
114 for (const auto entry : counters_reader.getEntries()) {
115 (*counters)[std::string(entry.getKey().cStr())] = entry.getValue();
116 }
117 }
118
119 if (stats_reader.hasTimers()) {
120 auto timers = stats->timers();
121 auto timers_reader = stats_reader.getCounters();
122 for (const auto entry : timers_reader.getEntries()) {
123 (*timers)[std::string(entry.getKey().cStr())] = entry.getValue();
124 }
125 }
126
127 return Status::Ok();
128 }
129
array_to_capnp(const Array & array,capnp::Array::Builder * array_builder)130 Status array_to_capnp(
131 const Array& array, capnp::Array::Builder* array_builder) {
132 array_builder->setStartTimestamp(array.timestamp_start());
133 array_builder->setEndTimestamp(array.timestamp_end());
134
135 return Status::Ok();
136 }
137
array_from_capnp(const capnp::Array::Reader & array_reader,Array * array)138 Status array_from_capnp(
139 const capnp::Array::Reader& array_reader, Array* array) {
140 RETURN_NOT_OK(array->set_timestamp_start(array_reader.getStartTimestamp()));
141 RETURN_NOT_OK(array->set_timestamp_end(array_reader.getEndTimestamp()));
142
143 return Status::Ok();
144 }
145
subarray_to_capnp(const ArraySchema * schema,const Subarray * subarray,capnp::Subarray::Builder * builder)146 Status subarray_to_capnp(
147 const ArraySchema* schema,
148 const Subarray* subarray,
149 capnp::Subarray::Builder* builder) {
150 builder->setLayout(layout_str(subarray->layout()));
151
152 const uint32_t dim_num = subarray->dim_num();
153 auto ranges_builder = builder->initRanges(dim_num);
154 for (uint32_t i = 0; i < dim_num; i++) {
155 const auto datatype = schema->dimension(i)->type();
156 auto range_builder = ranges_builder[i];
157 const auto& ranges = subarray->ranges_for_dim(i);
158 range_builder.setType(datatype_str(datatype));
159
160 range_builder.setHasDefaultRange(subarray->is_default(i));
161 auto range_sizes = range_builder.initBufferSizes(ranges.size());
162 auto range_start_sizes = range_builder.initBufferStartSizes(ranges.size());
163 // This will copy all of the ranges into one large byte vector
164 // Future improvement is to do this in a zero copy manner
165 // (kj::ArrayBuilder?)
166 auto capnpVector = kj::Vector<uint8_t>();
167 uint64_t range_idx = 0;
168 for (auto& range : ranges) {
169 capnpVector.addAll(kj::ArrayPtr<uint8_t>(
170 const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(range.data())),
171 range.size()));
172 range_sizes.set(range_idx, range.size());
173 range_start_sizes.set(range_idx, range.start_size());
174 ++range_idx;
175 }
176 range_builder.setBuffer(capnpVector.asPtr());
177 }
178
179 // If stats object exists set its cap'n proto object
180 stats::Stats* stats = subarray->stats();
181 if (stats != nullptr) {
182 auto stats_builder = builder->initStats();
183 RETURN_NOT_OK(stats_to_capnp(*stats, &stats_builder));
184 }
185
186 if (subarray->relevant_fragments()->size() > 0) {
187 auto relevant_fragments_builder =
188 builder->initRelevantFragments(subarray->relevant_fragments()->size());
189 for (size_t i = 0; i < subarray->relevant_fragments()->size(); ++i) {
190 relevant_fragments_builder.set(i, subarray->relevant_fragments()->at(i));
191 }
192 }
193
194 return Status::Ok();
195 }
196
subarray_from_capnp(const capnp::Subarray::Reader & reader,Subarray * subarray)197 Status subarray_from_capnp(
198 const capnp::Subarray::Reader& reader, Subarray* subarray) {
199 auto ranges_reader = reader.getRanges();
200 uint32_t dim_num = ranges_reader.size();
201 for (uint32_t i = 0; i < dim_num; i++) {
202 auto range_reader = ranges_reader[i];
203 Datatype type = Datatype::UINT8;
204 RETURN_NOT_OK(datatype_enum(range_reader.getType(), &type));
205
206 auto data = range_reader.getBuffer();
207 auto data_ptr = data.asBytes();
208 if (range_reader.hasBufferSizes()) {
209 auto buffer_sizes = range_reader.getBufferSizes();
210 auto buffer_start_sizes = range_reader.getBufferStartSizes();
211 size_t range_count = buffer_sizes.size();
212 std::vector<Range> ranges(range_count);
213 uint64_t offset = 0;
214 for (size_t j = 0; j < range_count; j++) {
215 uint64_t range_size = buffer_sizes[j];
216 uint64_t range_start_size = buffer_start_sizes[j];
217 if (range_start_size != 0) {
218 ranges[j] =
219 Range(data_ptr.begin() + offset, range_size, range_start_size);
220 } else {
221 ranges[j] = Range(data_ptr.begin() + offset, range_size);
222 }
223 offset += range_size;
224 }
225
226 RETURN_NOT_OK(subarray->set_ranges_for_dim(i, ranges));
227
228 // Set default indicator
229 subarray->set_is_default(i, range_reader.getHasDefaultRange());
230 } else {
231 // Handle 1.7 style ranges where there is a single range with no sizes
232 Range range(data_ptr.begin(), data.size());
233 RETURN_NOT_OK(subarray->set_ranges_for_dim(i, {range}));
234 subarray->set_is_default(i, range_reader.getHasDefaultRange());
235 }
236 }
237
238 // If cap'n proto object has stats set it on c++ object
239 if (reader.hasStats()) {
240 stats::Stats* stats = subarray->stats();
241 // We should always have a stats here
242 if (stats != nullptr) {
243 RETURN_NOT_OK(stats_from_capnp(reader.getStats(), stats));
244 }
245 }
246
247 if (reader.hasRelevantFragments()) {
248 auto relevant_fragments = reader.getRelevantFragments();
249 size_t count = relevant_fragments.size();
250 subarray->relevant_fragments()->reserve(count);
251 for (size_t i = 0; i < count; i++) {
252 subarray->relevant_fragments()->emplace_back(relevant_fragments[i]);
253 }
254 }
255
256 return Status::Ok();
257 }
258
subarray_partitioner_to_capnp(const ArraySchema * schema,const SubarrayPartitioner & partitioner,capnp::SubarrayPartitioner::Builder * builder)259 Status subarray_partitioner_to_capnp(
260 const ArraySchema* schema,
261 const SubarrayPartitioner& partitioner,
262 capnp::SubarrayPartitioner::Builder* builder) {
263 // Subarray
264 auto subarray_builder = builder->initSubarray();
265 RETURN_NOT_OK(
266 subarray_to_capnp(schema, partitioner.subarray(), &subarray_builder));
267
268 // Per-attr/dim mem budgets
269 const auto* budgets = partitioner.get_result_budgets();
270 if (!budgets->empty()) {
271 auto mem_budgets_builder = builder->initBudget(budgets->size());
272 size_t idx = 0;
273 for (const auto& pair : (*budgets)) {
274 const std::string& name = pair.first;
275 auto budget_builder = mem_budgets_builder[idx];
276 budget_builder.setAttribute(name);
277 auto var_size = schema->var_size(name);
278
279 if (name == constants::coords || !var_size) {
280 budget_builder.setOffsetBytes(0);
281 budget_builder.setDataBytes(pair.second.size_fixed_);
282 } else {
283 budget_builder.setOffsetBytes(pair.second.size_fixed_);
284 budget_builder.setDataBytes(pair.second.size_var_);
285 }
286
287 budget_builder.setValidityBytes(pair.second.size_validity_);
288
289 idx++;
290 }
291 }
292
293 // Current partition info
294 const auto* partition_info = partitioner.current_partition_info();
295 // If the array is null that means there is no current partition info
296 if (partition_info->partition_.array() != nullptr) {
297 auto info_builder = builder->initCurrent();
298 auto info_subarray_builder = info_builder.initSubarray();
299 RETURN_NOT_OK(subarray_to_capnp(
300 schema, &partition_info->partition_, &info_subarray_builder));
301 info_builder.setStart(partition_info->start_);
302 info_builder.setEnd(partition_info->end_);
303 info_builder.setSplitMultiRange(partition_info->split_multi_range_);
304 }
305
306 // Partitioner state
307 const auto* state = partitioner.state();
308 auto state_builder = builder->initState();
309 state_builder.setStart(state->start_);
310 state_builder.setEnd(state->end_);
311 auto single_range_builder =
312 state_builder.initSingleRange(state->single_range_.size());
313 size_t sr_idx = 0;
314 for (const auto& subarray : state->single_range_) {
315 auto b = single_range_builder[sr_idx];
316 RETURN_NOT_OK(subarray_to_capnp(schema, &subarray, &b));
317 sr_idx++;
318 }
319 auto multi_range_builder =
320 state_builder.initMultiRange(state->multi_range_.size());
321 size_t m_idx = 0;
322 for (const auto& subarray : state->multi_range_) {
323 auto b = multi_range_builder[m_idx];
324 RETURN_NOT_OK(subarray_to_capnp(schema, &subarray, &b));
325 m_idx++;
326 }
327
328 // Overall mem budget
329 uint64_t mem_budget, mem_budget_var, mem_budget_validity;
330 RETURN_NOT_OK(partitioner.get_memory_budget(
331 &mem_budget, &mem_budget_var, &mem_budget_validity));
332 builder->setMemoryBudget(mem_budget);
333 builder->setMemoryBudgetVar(mem_budget_var);
334 builder->setMemoryBudgetValidity(mem_budget_validity);
335
336 // If stats object exists set its cap'n proto object
337 stats::Stats* stats = partitioner.stats();
338 if (stats != nullptr) {
339 auto stats_builder = builder->initStats();
340 RETURN_NOT_OK(stats_to_capnp(*stats, &stats_builder));
341 }
342
343 return Status::Ok();
344 }
345
subarray_partitioner_from_capnp(Stats * reader_stats,const Config * config,const Array * array,const capnp::SubarrayPartitioner::Reader & reader,SubarrayPartitioner * partitioner,ThreadPool * compute_tp,const bool & compute_current_tile_overlap)346 Status subarray_partitioner_from_capnp(
347 Stats* reader_stats,
348 const Config* config,
349 const Array* array,
350 const capnp::SubarrayPartitioner::Reader& reader,
351 SubarrayPartitioner* partitioner,
352 ThreadPool* compute_tp,
353 const bool& compute_current_tile_overlap) {
354 // Get memory budget
355 uint64_t memory_budget = 0;
356 RETURN_NOT_OK(tiledb::sm::utils::parse::convert(
357 Config::SM_MEMORY_BUDGET, &memory_budget));
358 uint64_t memory_budget_var = 0;
359 RETURN_NOT_OK(tiledb::sm::utils::parse::convert(
360 Config::SM_MEMORY_BUDGET_VAR, &memory_budget_var));
361 uint64_t memory_budget_validity = 0;
362
363 // Get subarray layout first
364 Layout layout = Layout::ROW_MAJOR;
365 auto subarray_reader = reader.getSubarray();
366 RETURN_NOT_OK(layout_enum(subarray_reader.getLayout(), &layout));
367
368 // Subarray, which is used to initialize the partitioner.
369 Subarray subarray(array, layout, reader_stats, dummy_logger, false);
370 RETURN_NOT_OK(subarray_from_capnp(reader.getSubarray(), &subarray));
371 *partitioner = SubarrayPartitioner(
372 config,
373 subarray,
374 memory_budget,
375 memory_budget_var,
376 memory_budget_validity,
377 compute_tp,
378 reader_stats,
379 dummy_logger);
380
381 // Per-attr mem budgets
382 if (reader.hasBudget()) {
383 const ArraySchema* schema = array->array_schema_latest();
384 auto mem_budgets_reader = reader.getBudget();
385 auto num_attrs = mem_budgets_reader.size();
386 for (size_t i = 0; i < num_attrs; i++) {
387 auto mem_budget_reader = mem_budgets_reader[i];
388 std::string attr_name = mem_budget_reader.getAttribute();
389 auto var_size = schema->var_size(attr_name);
390 auto nullable = schema->is_nullable(attr_name);
391
392 if (attr_name == constants::coords || !var_size) {
393 if (nullable) {
394 RETURN_NOT_OK(partitioner->set_result_budget_nullable(
395 attr_name.c_str(),
396 mem_budget_reader.getDataBytes(),
397 mem_budget_reader.getValidityBytes()));
398 } else {
399 RETURN_NOT_OK(partitioner->set_result_budget(
400 attr_name.c_str(), mem_budget_reader.getDataBytes()));
401 }
402 } else {
403 if (nullable) {
404 RETURN_NOT_OK(partitioner->set_result_budget_nullable(
405 attr_name.c_str(),
406 mem_budget_reader.getOffsetBytes(),
407 mem_budget_reader.getDataBytes(),
408 mem_budget_reader.getValidityBytes()));
409 } else {
410 RETURN_NOT_OK(partitioner->set_result_budget(
411 attr_name.c_str(),
412 mem_budget_reader.getOffsetBytes(),
413 mem_budget_reader.getDataBytes()));
414 }
415 }
416 }
417 }
418
419 // Current partition info
420 if (reader.hasCurrent()) {
421 auto partition_info_reader = reader.getCurrent();
422 auto* partition_info = partitioner->current_partition_info();
423 partition_info->start_ = partition_info_reader.getStart();
424 partition_info->end_ = partition_info_reader.getEnd();
425 partition_info->split_multi_range_ =
426 partition_info_reader.getSplitMultiRange();
427 partition_info->partition_ =
428 Subarray(array, layout, reader_stats, dummy_logger, false);
429 RETURN_NOT_OK(subarray_from_capnp(
430 partition_info_reader.getSubarray(), &partition_info->partition_));
431
432 if (compute_current_tile_overlap) {
433 partition_info->partition_.precompute_tile_overlap(
434 partition_info->start_,
435 partition_info->end_,
436 config,
437 compute_tp,
438 true);
439 }
440 }
441
442 // Partitioner state
443 auto state_reader = reader.getState();
444 auto* state = partitioner->state();
445 state->start_ = state_reader.getStart();
446 state->end_ = state_reader.getEnd();
447 auto sr_reader = state_reader.getSingleRange();
448 const unsigned num_sr = sr_reader.size();
449 for (unsigned i = 0; i < num_sr; i++) {
450 auto subarray_reader_ = sr_reader[i];
451 state->single_range_.emplace_back(
452 array, layout, reader_stats, dummy_logger, false);
453 Subarray& subarray_ = state->single_range_.back();
454 RETURN_NOT_OK(subarray_from_capnp(subarray_reader_, &subarray_));
455 }
456 auto m_reader = state_reader.getMultiRange();
457 const unsigned num_m = m_reader.size();
458 for (unsigned i = 0; i < num_m; i++) {
459 auto subarray_reader_ = m_reader[i];
460 state->multi_range_.emplace_back(
461 array, layout, reader_stats, dummy_logger, false);
462 Subarray& subarray_ = state->multi_range_.back();
463 RETURN_NOT_OK(subarray_from_capnp(subarray_reader_, &subarray_));
464 }
465
466 // Overall mem budget
467 RETURN_NOT_OK(partitioner->set_memory_budget(
468 reader.getMemoryBudget(),
469 reader.getMemoryBudgetVar(),
470 reader.getMemoryBudgetValidity()));
471
472 // If cap'n proto object has stats set it on c++ object
473 if (reader.hasStats()) {
474 auto stats = partitioner->stats();
475 // We should always have stats
476 if (stats != nullptr) {
477 RETURN_NOT_OK(stats_from_capnp(reader.getStats(), stats));
478 }
479 }
480
481 return Status::Ok();
482 }
483
read_state_to_capnp(const ArraySchema * schema,const Reader & reader,capnp::QueryReader::Builder * builder)484 Status read_state_to_capnp(
485 const ArraySchema* schema,
486 const Reader& reader,
487 capnp::QueryReader::Builder* builder) {
488 auto read_state = reader.read_state();
489 auto read_state_builder = builder->initReadState();
490 read_state_builder.setOverflowed(read_state->overflowed_);
491 read_state_builder.setUnsplittable(read_state->unsplittable_);
492 read_state_builder.setInitialized(read_state->initialized_);
493
494 if (read_state->initialized_) {
495 auto partitioner_builder = read_state_builder.initSubarrayPartitioner();
496 RETURN_NOT_OK(subarray_partitioner_to_capnp(
497 schema, read_state->partitioner_, &partitioner_builder));
498 }
499
500 return Status::Ok();
501 }
502
index_read_state_to_capnp(const SparseIndexReaderBase::ReadState * read_state,capnp::ReaderIndex::Builder * builder)503 Status index_read_state_to_capnp(
504 const SparseIndexReaderBase::ReadState* read_state,
505 capnp::ReaderIndex::Builder* builder) {
506 auto read_state_builder = builder->initReadState();
507
508 read_state_builder.setDoneAddingResultTiles(
509 read_state->done_adding_result_tiles_);
510
511 auto rcs_builder = read_state_builder.initResultCellSlab(
512 read_state->result_cell_slabs_.size());
513 for (size_t i = 0; i < read_state->result_cell_slabs_.size(); ++i) {
514 rcs_builder[i].setFragIdx(
515 read_state->result_cell_slabs_[i].tile_->frag_idx());
516 rcs_builder[i].setTileIdx(
517 read_state->result_cell_slabs_[i].tile_->tile_idx());
518 rcs_builder[i].setStart(read_state->result_cell_slabs_[i].start_);
519 rcs_builder[i].setLength(read_state->result_cell_slabs_[i].length_);
520 }
521
522 auto frag_tile_idx_builder =
523 read_state_builder.initFragTileIdx(read_state->frag_tile_idx_.size());
524 for (size_t i = 0; i < read_state->frag_tile_idx_.size(); ++i) {
525 frag_tile_idx_builder[i].setTileIdx(read_state->frag_tile_idx_[i].first);
526 frag_tile_idx_builder[i].setCellIdx(read_state->frag_tile_idx_[i].second);
527 }
528
529 return Status::Ok();
530 }
531
dense_read_state_to_capnp(const ArraySchema * schema,const DenseReader & reader,capnp::QueryReader::Builder * builder)532 Status dense_read_state_to_capnp(
533 const ArraySchema* schema,
534 const DenseReader& reader,
535 capnp::QueryReader::Builder* builder) {
536 auto read_state = reader.read_state();
537 auto read_state_builder = builder->initReadState();
538 read_state_builder.setOverflowed(read_state->overflowed_);
539 read_state_builder.setUnsplittable(read_state->unsplittable_);
540 read_state_builder.setInitialized(read_state->initialized_);
541
542 if (read_state->initialized_) {
543 auto partitioner_builder = read_state_builder.initSubarrayPartitioner();
544 RETURN_NOT_OK(subarray_partitioner_to_capnp(
545 schema, read_state->partitioner_, &partitioner_builder));
546 }
547
548 return Status::Ok();
549 }
550
read_state_from_capnp(const Array * array,const capnp::ReadState::Reader & read_state_reader,Query * query,Reader * reader,ThreadPool * compute_tp)551 Status read_state_from_capnp(
552 const Array* array,
553 const capnp::ReadState::Reader& read_state_reader,
554 Query* query,
555 Reader* reader,
556 ThreadPool* compute_tp) {
557 auto read_state = reader->read_state();
558
559 read_state->overflowed_ = read_state_reader.getOverflowed();
560 read_state->unsplittable_ = read_state_reader.getUnsplittable();
561 read_state->initialized_ = read_state_reader.getInitialized();
562
563 // Subarray partitioner
564 if (read_state_reader.hasSubarrayPartitioner()) {
565 RETURN_NOT_OK(subarray_partitioner_from_capnp(
566 reader->stats(),
567 query->config(),
568 array,
569 read_state_reader.getSubarrayPartitioner(),
570 &read_state->partitioner_,
571 compute_tp,
572 // If the current partition is unsplittable, this means we need to make
573 // sure the tile_overlap for the current is computed because we won't go
574 // to the next partition
575 read_state->unsplittable_));
576 }
577
578 return Status::Ok();
579 }
580
index_read_state_from_capnp(const ArraySchema * schema,const capnp::ReadStateIndex::Reader & read_state_reader,SparseIndexReaderBase * reader)581 Status index_read_state_from_capnp(
582 const ArraySchema* schema,
583 const capnp::ReadStateIndex::Reader& read_state_reader,
584 SparseIndexReaderBase* reader) {
585 auto read_state = reader->read_state();
586 const auto* domain = schema->domain();
587
588 read_state->done_adding_result_tiles_ =
589 read_state_reader.getDoneAddingResultTiles();
590
591 assert(read_state_reader.hasResultCellSlab());
592 RETURN_NOT_OK(reader->clear_result_tiles());
593 read_state->result_cell_slabs_.clear();
594
595 std::unordered_map<
596 std::pair<unsigned, uint64_t>,
597 ResultTile*,
598 tiledb::sm::utils::hash::pair_hash>
599 result_tile_map;
600 for (const auto rcs : read_state_reader.getResultCellSlab()) {
601 auto start = rcs.getStart();
602 auto length = rcs.getLength();
603 auto frag_idx = rcs.getFragIdx();
604 auto tile_idx = rcs.getTileIdx();
605
606 ResultTile* rt = nullptr;
607 auto it =
608 result_tile_map.find(std::pair<unsigned, uint64_t>(frag_idx, tile_idx));
609 if (it != result_tile_map.end()) {
610 rt = it->second;
611 } else {
612 rt = reader->add_result_tile_unsafe(frag_idx, tile_idx, domain);
613 result_tile_map.emplace(
614 std::pair<unsigned, uint64_t>(frag_idx, tile_idx), rt);
615 }
616
617 read_state->result_cell_slabs_.emplace_back(rt, start, length);
618 }
619
620 assert(read_state_reader.hasFragTileIdx());
621 read_state->frag_tile_idx_.clear();
622 for (const auto rcs : read_state_reader.getFragTileIdx()) {
623 auto tile_idx = rcs.getTileIdx();
624 auto cell_idx = rcs.getCellIdx();
625
626 read_state->frag_tile_idx_.emplace_back(tile_idx, cell_idx);
627 }
628
629 return Status::Ok();
630 }
631
dense_read_state_from_capnp(const Array * array,const capnp::ReadState::Reader & read_state_reader,Query * query,DenseReader * reader,ThreadPool * compute_tp)632 Status dense_read_state_from_capnp(
633 const Array* array,
634 const capnp::ReadState::Reader& read_state_reader,
635 Query* query,
636 DenseReader* reader,
637 ThreadPool* compute_tp) {
638 auto read_state = reader->read_state();
639
640 read_state->overflowed_ = read_state_reader.getOverflowed();
641 read_state->unsplittable_ = read_state_reader.getUnsplittable();
642 read_state->initialized_ = read_state_reader.getInitialized();
643
644 // Subarray partitioner
645 if (read_state_reader.hasSubarrayPartitioner()) {
646 RETURN_NOT_OK(subarray_partitioner_from_capnp(
647 reader->stats(),
648 query->config(),
649 array,
650 read_state_reader.getSubarrayPartitioner(),
651 &read_state->partitioner_,
652 compute_tp,
653 // If the current partition is unsplittable, this means we need to make
654 // sure the tile_overlap for the current is computed because we won't go
655 // to the next partition
656 read_state->unsplittable_));
657 }
658
659 return Status::Ok();
660 }
661
condition_to_capnp(const QueryCondition & condition,capnp::Condition::Builder * condition_builder)662 Status condition_to_capnp(
663 const QueryCondition& condition,
664 capnp::Condition::Builder* condition_builder) {
665 // Serialize the clauses.
666 const std::vector<QueryCondition::Clause> clauses = condition.clauses();
667 assert(!clauses.empty());
668 auto clause_builder = condition_builder->initClauses(clauses.size());
669 for (size_t i = 0; i < clauses.size(); ++i) {
670 clause_builder[i].setFieldName(clauses[i].field_name_);
671
672 // Copy the condition value into a capnp vector of bytes.
673 const ByteVecValue value = clauses[i].condition_value_data_;
674 auto capnpValue = kj::Vector<uint8_t>();
675 capnpValue.addAll(kj::ArrayPtr<uint8_t>(
676 const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(value.data())),
677 value.size()));
678
679 // Store the condition value vector of bytes.
680 clause_builder[i].setValue(capnpValue.asPtr());
681
682 const std::string op_str = query_condition_op_str(clauses[i].op_);
683 clause_builder[i].setOp(op_str);
684 }
685
686 // Serialize the combination ops.
687 const std::vector<QueryConditionCombinationOp> combination_ops =
688 condition.combination_ops();
689 if (!combination_ops.empty()) {
690 auto combination_ops_builder =
691 condition_builder->initClauseCombinationOps(combination_ops.size());
692 for (size_t i = 0; i < combination_ops.size(); ++i) {
693 const std::string op_str =
694 query_condition_combination_op_str(combination_ops[i]);
695 combination_ops_builder.set(i, op_str);
696 }
697 }
698
699 return Status::Ok();
700 }
701
reader_to_capnp(const Query & query,const Reader & reader,capnp::QueryReader::Builder * reader_builder)702 Status reader_to_capnp(
703 const Query& query,
704 const Reader& reader,
705 capnp::QueryReader::Builder* reader_builder) {
706 auto array_schema = query.array_schema();
707
708 // Subarray layout
709 const auto& layout = layout_str(query.layout());
710 reader_builder->setLayout(layout);
711
712 // Subarray
713 auto subarray_builder = reader_builder->initSubarray();
714 RETURN_NOT_OK(
715 subarray_to_capnp(array_schema, query.subarray(), &subarray_builder));
716
717 // Read state
718 RETURN_NOT_OK(read_state_to_capnp(array_schema, reader, reader_builder));
719
720 const QueryCondition* condition = query.condition();
721 if (!condition->empty()) {
722 auto condition_builder = reader_builder->initCondition();
723 RETURN_NOT_OK(condition_to_capnp(*condition, &condition_builder));
724 }
725
726 // If stats object exists set its cap'n proto object
727 stats::Stats* stats = reader.stats();
728 if (stats != nullptr) {
729 auto stats_builder = reader_builder->initStats();
730 RETURN_NOT_OK(stats_to_capnp(*stats, &stats_builder));
731 }
732
733 return Status::Ok();
734 }
735
index_reader_to_capnp(const Query & query,const SparseIndexReaderBase & reader,capnp::ReaderIndex::Builder * reader_builder)736 Status index_reader_to_capnp(
737 const Query& query,
738 const SparseIndexReaderBase& reader,
739 capnp::ReaderIndex::Builder* reader_builder) {
740 auto array_schema = query.array_schema();
741
742 // Subarray layout
743 const auto& layout = layout_str(query.layout());
744 reader_builder->setLayout(layout);
745
746 // Subarray
747 auto subarray_builder = reader_builder->initSubarray();
748 RETURN_NOT_OK(
749 subarray_to_capnp(array_schema, query.subarray(), &subarray_builder));
750
751 // Read state
752 RETURN_NOT_OK(index_read_state_to_capnp(reader.read_state(), reader_builder));
753
754 const QueryCondition* condition = query.condition();
755 if (!condition->empty()) {
756 auto condition_builder = reader_builder->initCondition();
757 RETURN_NOT_OK(condition_to_capnp(*condition, &condition_builder));
758 }
759
760 // If stats object exists set its cap'n proto object
761 stats::Stats* stats = reader.stats();
762 if (stats != nullptr) {
763 auto stats_builder = reader_builder->initStats();
764 RETURN_NOT_OK(stats_to_capnp(*stats, &stats_builder));
765 }
766
767 return Status::Ok();
768 }
769
dense_reader_to_capnp(const Query & query,const DenseReader & reader,capnp::QueryReader::Builder * reader_builder)770 Status dense_reader_to_capnp(
771 const Query& query,
772 const DenseReader& reader,
773 capnp::QueryReader::Builder* reader_builder) {
774 auto array_schema = query.array_schema();
775
776 // Subarray layout
777 const auto& layout = layout_str(query.layout());
778 reader_builder->setLayout(layout);
779
780 // Subarray
781 auto subarray_builder = reader_builder->initSubarray();
782 RETURN_NOT_OK(
783 subarray_to_capnp(array_schema, query.subarray(), &subarray_builder));
784
785 // Read state
786 RETURN_NOT_OK(
787 dense_read_state_to_capnp(array_schema, reader, reader_builder));
788
789 const QueryCondition* condition = query.condition();
790 if (!condition->empty()) {
791 auto condition_builder = reader_builder->initCondition();
792 RETURN_NOT_OK(condition_to_capnp(*condition, &condition_builder));
793 }
794
795 // If stats object exists set its cap'n proto object
796 stats::Stats* stats = reader.stats();
797 if (stats != nullptr) {
798 auto stats_builder = reader_builder->initStats();
799 RETURN_NOT_OK(stats_to_capnp(*stats, &stats_builder));
800 }
801
802 return Status::Ok();
803 }
804
condition_from_capnp(const capnp::Condition::Reader & condition_reader,QueryCondition * const condition)805 Status condition_from_capnp(
806 const capnp::Condition::Reader& condition_reader,
807 QueryCondition* const condition) {
808 // Deserialize the clauses.
809 std::vector<QueryCondition::Clause> clauses;
810 assert(condition_reader.hasClauses());
811 for (const auto clause : condition_reader.getClauses()) {
812 std::string field_name = clause.getFieldName();
813
814 auto condition_value = clause.getValue();
815
816 QueryConditionOp op = QueryConditionOp::LT;
817 RETURN_NOT_OK(query_condition_op_enum(clause.getOp(), &op));
818
819 clauses.emplace_back(
820 std::move(field_name),
821 condition_value.asBytes().begin(),
822 condition_value.size(),
823 op);
824 }
825 condition->set_clauses(std::move(clauses));
826
827 // Deserialize the combination ops.
828 if (condition_reader.hasClauseCombinationOps()) {
829 std::vector<QueryConditionCombinationOp> combination_ops;
830 for (const auto combination_op_str :
831 condition_reader.getClauseCombinationOps()) {
832 QueryConditionCombinationOp combination_op =
833 QueryConditionCombinationOp::AND;
834 RETURN_NOT_OK(query_condition_combination_op_enum(
835 combination_op_str, &combination_op));
836 combination_ops.emplace_back(combination_op);
837 }
838 condition->set_combination_ops(std::move(combination_ops));
839 }
840
841 return Status::Ok();
842 }
843
reader_from_capnp(const capnp::QueryReader::Reader & reader_reader,Query * query,Reader * reader,ThreadPool * compute_tp)844 Status reader_from_capnp(
845 const capnp::QueryReader::Reader& reader_reader,
846 Query* query,
847 Reader* reader,
848 ThreadPool* compute_tp) {
849 auto array = query->array();
850
851 // Layout
852 Layout layout = Layout::ROW_MAJOR;
853 RETURN_NOT_OK(layout_enum(reader_reader.getLayout(), &layout));
854 RETURN_NOT_OK(query->set_layout_unsafe(layout));
855
856 // Subarray
857 Subarray subarray(array, layout, reader->stats(), dummy_logger, false);
858 auto subarray_reader = reader_reader.getSubarray();
859 RETURN_NOT_OK(subarray_from_capnp(subarray_reader, &subarray));
860 RETURN_NOT_OK(query->set_subarray_unsafe(subarray));
861
862 // Read state
863 if (reader_reader.hasReadState())
864 RETURN_NOT_OK(read_state_from_capnp(
865 array, reader_reader.getReadState(), query, reader, compute_tp));
866
867 // Query condition
868 if (reader_reader.hasCondition()) {
869 auto condition_reader = reader_reader.getCondition();
870 QueryCondition condition;
871 RETURN_NOT_OK(condition_from_capnp(condition_reader, &condition));
872 RETURN_NOT_OK(query->set_condition(condition));
873 }
874
875 // If cap'n proto object has stats set it on c++ object
876 if (reader_reader.hasStats()) {
877 stats::Stats* stats = reader->stats();
878 // We should always have a stats here
879 if (stats != nullptr) {
880 RETURN_NOT_OK(stats_from_capnp(reader_reader.getStats(), stats));
881 }
882 }
883
884 return Status::Ok();
885 }
886
index_reader_from_capnp(const ArraySchema * schema,const capnp::ReaderIndex::Reader & reader_reader,Query * query,SparseIndexReaderBase * reader)887 Status index_reader_from_capnp(
888 const ArraySchema* schema,
889 const capnp::ReaderIndex::Reader& reader_reader,
890 Query* query,
891 SparseIndexReaderBase* reader) {
892 auto array = query->array();
893
894 // Layout
895 Layout layout = Layout::ROW_MAJOR;
896 RETURN_NOT_OK(layout_enum(reader_reader.getLayout(), &layout));
897
898 // Subarray
899 Subarray subarray(array, layout, reader->stats(), dummy_logger, false);
900 auto subarray_reader = reader_reader.getSubarray();
901 RETURN_NOT_OK(subarray_from_capnp(subarray_reader, &subarray));
902 RETURN_NOT_OK(query->set_subarray_unsafe(subarray));
903
904 // Read state
905 if (reader_reader.hasReadState())
906 RETURN_NOT_OK(index_read_state_from_capnp(
907 schema, reader_reader.getReadState(), reader));
908
909 // Query condition
910 if (reader_reader.hasCondition()) {
911 auto condition_reader = reader_reader.getCondition();
912 QueryCondition condition;
913 RETURN_NOT_OK(condition_from_capnp(condition_reader, &condition));
914 RETURN_NOT_OK(query->set_condition(condition));
915 }
916
917 // If cap'n proto object has stats set it on c++ object
918 if (reader_reader.hasStats()) {
919 stats::Stats* stats = reader->stats();
920 // We should always have a stats here
921 if (stats != nullptr) {
922 RETURN_NOT_OK(stats_from_capnp(reader_reader.getStats(), stats));
923 }
924 }
925
926 return Status::Ok();
927 }
928
dense_reader_from_capnp(const ArraySchema * schema,const capnp::QueryReader::Reader & reader_reader,Query * query,DenseReader * reader,ThreadPool * compute_tp)929 Status dense_reader_from_capnp(
930 const ArraySchema* schema,
931 const capnp::QueryReader::Reader& reader_reader,
932 Query* query,
933 DenseReader* reader,
934 ThreadPool* compute_tp) {
935 auto array = query->array();
936
937 // Layout
938 Layout layout = Layout::ROW_MAJOR;
939 RETURN_NOT_OK(layout_enum(reader_reader.getLayout(), &layout));
940
941 // Subarray
942 Subarray subarray(array, layout, reader->stats(), dummy_logger, false);
943 auto subarray_reader = reader_reader.getSubarray();
944 RETURN_NOT_OK(subarray_from_capnp(subarray_reader, &subarray));
945 RETURN_NOT_OK(query->set_subarray_unsafe(subarray));
946
947 // Read state
948 if (reader_reader.hasReadState())
949 RETURN_NOT_OK(dense_read_state_from_capnp(
950 array, reader_reader.getReadState(), query, reader, compute_tp));
951
952 // Query condition
953 if (reader_reader.hasCondition()) {
954 auto condition_reader = reader_reader.getCondition();
955 QueryCondition condition;
956 RETURN_NOT_OK(condition_from_capnp(condition_reader, &condition));
957 RETURN_NOT_OK(query->set_condition(condition));
958 }
959
960 // If cap'n proto object has stats set it on c++ object
961 if (reader_reader.hasStats()) {
962 stats::Stats* stats = reader->stats();
963 // We should always have a stats here
964 if (stats != nullptr) {
965 RETURN_NOT_OK(stats_from_capnp(reader_reader.getStats(), stats));
966 }
967 }
968
969 return Status::Ok();
970 }
971
writer_to_capnp(const Query & query,Writer & writer,capnp::Writer::Builder * writer_builder)972 Status writer_to_capnp(
973 const Query& query,
974 Writer& writer,
975 capnp::Writer::Builder* writer_builder) {
976 writer_builder->setCheckCoordDups(writer.get_check_coord_dups());
977 writer_builder->setCheckCoordOOB(writer.get_check_coord_oob());
978 writer_builder->setDedupCoords(writer.get_dedup_coords());
979
980 const auto* array_schema = query.array_schema();
981
982 if (array_schema->dense()) {
983 std::vector<uint8_t> subarray_flat;
984 RETURN_NOT_OK(query.subarray()->to_byte_vec(&subarray_flat));
985 auto subarray_builder = writer_builder->initSubarray();
986 RETURN_NOT_OK(utils::serialize_subarray(
987 subarray_builder, array_schema, &subarray_flat[0]));
988 }
989
990 // Subarray
991 const auto subarray_ranges = query.subarray();
992 if (!subarray_ranges->empty()) {
993 auto subarray_builder = writer_builder->initSubarrayRanges();
994 RETURN_NOT_OK(
995 subarray_to_capnp(array_schema, subarray_ranges, &subarray_builder));
996 }
997
998 // If stats object exists set its cap'n proto object
999 stats::Stats* stats = writer.stats();
1000 if (stats != nullptr) {
1001 auto stats_builder = writer_builder->initStats();
1002 RETURN_NOT_OK(stats_to_capnp(*stats, &stats_builder));
1003 }
1004
1005 return Status::Ok();
1006 }
1007
writer_from_capnp(const capnp::Writer::Reader & writer_reader,Writer * writer)1008 Status writer_from_capnp(
1009 const capnp::Writer::Reader& writer_reader, Writer* writer) {
1010 writer->set_check_coord_dups(writer_reader.getCheckCoordDups());
1011 writer->set_check_coord_oob(writer_reader.getCheckCoordOOB());
1012 writer->set_dedup_coords(writer_reader.getDedupCoords());
1013
1014 // If cap'n proto object has stats set it on c++ object
1015 if (writer_reader.hasStats()) {
1016 stats::Stats* stats = writer->stats();
1017 // We should always have a stats here
1018 if (stats != nullptr) {
1019 RETURN_NOT_OK(stats_from_capnp(writer_reader.getStats(), stats));
1020 }
1021 }
1022
1023 return Status::Ok();
1024 }
1025
query_to_capnp(Query & query,capnp::Query::Builder * query_builder)1026 Status query_to_capnp(Query& query, capnp::Query::Builder* query_builder) {
1027 // For easy reference
1028 auto layout = query.layout();
1029 auto type = query.type();
1030 auto array = query.array();
1031
1032 if (layout == Layout::GLOBAL_ORDER && query.type() == QueryType::WRITE)
1033 return LOG_STATUS(
1034 Status::SerializationError("Cannot serialize; global order "
1035 "serialization not supported for writes."));
1036
1037 if (array == nullptr)
1038 return LOG_STATUS(
1039 Status::SerializationError("Cannot serialize; array is null."));
1040
1041 const auto* schema = query.array_schema();
1042 if (schema == nullptr)
1043 return LOG_STATUS(
1044 Status::SerializationError("Cannot serialize; array schema is null."));
1045
1046 const auto* domain = schema->domain();
1047 if (domain == nullptr)
1048 return LOG_STATUS(
1049 Status::SerializationError("Cannot serialize; array domain is null."));
1050
1051 // Serialize basic fields
1052 query_builder->setType(query_type_str(type));
1053 query_builder->setLayout(layout_str(layout));
1054 query_builder->setStatus(query_status_str(query.status()));
1055
1056 // Serialize array
1057 if (query.array() != nullptr) {
1058 auto builder = query_builder->initArray();
1059 RETURN_NOT_OK(array_to_capnp(*array, &builder));
1060 }
1061
1062 // Serialize attribute buffer metadata
1063 const auto buffer_names = query.buffer_names();
1064 auto attr_buffers_builder =
1065 query_builder->initAttributeBufferHeaders(buffer_names.size());
1066 uint64_t total_fixed_len_bytes = 0;
1067 uint64_t total_var_len_bytes = 0;
1068 uint64_t total_validity_len_bytes = 0;
1069 for (uint64_t i = 0; i < buffer_names.size(); i++) {
1070 auto attr_buffer_builder = attr_buffers_builder[i];
1071 const auto& name = buffer_names[i];
1072 const auto& buff = query.buffer(name);
1073 attr_buffer_builder.setName(name);
1074 if (buff.buffer_var_ != nullptr && buff.buffer_var_size_ != nullptr) {
1075 // Variable-sized buffer
1076 total_var_len_bytes += *buff.buffer_var_size_;
1077 attr_buffer_builder.setVarLenBufferSizeInBytes(*buff.buffer_var_size_);
1078 total_fixed_len_bytes += *buff.buffer_size_;
1079 attr_buffer_builder.setFixedLenBufferSizeInBytes(*buff.buffer_size_);
1080
1081 // Set original user requested sizes
1082 attr_buffer_builder.setOriginalVarLenBufferSizeInBytes(
1083 buff.original_buffer_var_size_);
1084 attr_buffer_builder.setOriginalFixedLenBufferSizeInBytes(
1085 buff.original_buffer_size_);
1086 } else if (buff.buffer_ != nullptr && buff.buffer_size_ != nullptr) {
1087 // Fixed-length buffer
1088 total_fixed_len_bytes += *buff.buffer_size_;
1089 attr_buffer_builder.setFixedLenBufferSizeInBytes(*buff.buffer_size_);
1090 attr_buffer_builder.setVarLenBufferSizeInBytes(0);
1091
1092 // Set original user requested sizes
1093 attr_buffer_builder.setOriginalVarLenBufferSizeInBytes(0);
1094 attr_buffer_builder.setOriginalFixedLenBufferSizeInBytes(
1095 buff.original_buffer_size_);
1096 } else {
1097 assert(false);
1098 }
1099
1100 if (buff.validity_vector_.buffer_size() != nullptr) {
1101 total_validity_len_bytes += *buff.validity_vector_.buffer_size();
1102 attr_buffer_builder.setValidityLenBufferSizeInBytes(
1103 *buff.validity_vector_.buffer_size());
1104
1105 // Set original user requested sizes
1106 attr_buffer_builder.setOriginalValidityLenBufferSizeInBytes(
1107 buff.original_validity_vector_size_);
1108 }
1109 }
1110
1111 query_builder->setTotalFixedLengthBufferBytes(total_fixed_len_bytes);
1112 query_builder->setTotalVarLenBufferBytes(total_var_len_bytes);
1113 query_builder->setTotalValidityBufferBytes(total_validity_len_bytes);
1114
1115 if (type == QueryType::READ) {
1116 bool all_dense = true;
1117 for (auto& frag_md : array->fragment_metadata())
1118 all_dense &= frag_md->dense();
1119 if (query.use_refactored_sparse_unordered_with_dups_reader() &&
1120 !schema->dense() && layout == Layout::UNORDERED &&
1121 schema->allows_dups()) {
1122 auto builder = query_builder->initReaderIndex();
1123 auto reader = (SparseUnorderedWithDupsReader*)query.strategy();
1124
1125 query_builder->setVarOffsetsMode(reader->offsets_mode());
1126 query_builder->setVarOffsetsAddExtraElement(
1127 reader->offsets_extra_element());
1128 query_builder->setVarOffsetsBitsize(reader->offsets_bitsize());
1129 RETURN_NOT_OK(index_reader_to_capnp(query, *reader, &builder));
1130 } else if (
1131 query.use_refactored_sparse_global_order_reader() && !schema->dense() &&
1132 (layout == Layout::GLOBAL_ORDER ||
1133 (layout == Layout::UNORDERED && query.subarray()->range_num() <= 1))) {
1134 auto builder = query_builder->initReaderIndex();
1135 auto reader = (SparseGlobalOrderReader*)query.strategy();
1136
1137 query_builder->setVarOffsetsMode(reader->offsets_mode());
1138 query_builder->setVarOffsetsAddExtraElement(
1139 reader->offsets_extra_element());
1140 query_builder->setVarOffsetsBitsize(reader->offsets_bitsize());
1141 RETURN_NOT_OK(index_reader_to_capnp(query, *reader, &builder));
1142 } else if (
1143 query.use_refactored_dense_reader() && all_dense && schema->dense()) {
1144 auto builder = query_builder->initDenseReader();
1145 auto reader = (DenseReader*)query.strategy();
1146
1147 query_builder->setVarOffsetsMode(reader->offsets_mode());
1148 query_builder->setVarOffsetsAddExtraElement(
1149 reader->offsets_extra_element());
1150 query_builder->setVarOffsetsBitsize(reader->offsets_bitsize());
1151 RETURN_NOT_OK(dense_reader_to_capnp(query, *reader, &builder));
1152 } else {
1153 auto builder = query_builder->initReader();
1154 auto reader = (Reader*)query.strategy();
1155
1156 query_builder->setVarOffsetsMode(reader->offsets_mode());
1157 query_builder->setVarOffsetsAddExtraElement(
1158 reader->offsets_extra_element());
1159 query_builder->setVarOffsetsBitsize(reader->offsets_bitsize());
1160 RETURN_NOT_OK(reader_to_capnp(query, *reader, &builder));
1161 }
1162 } else {
1163 auto builder = query_builder->initWriter();
1164 auto writer = (Writer*)query.strategy();
1165
1166 query_builder->setVarOffsetsMode(writer->offsets_mode());
1167 query_builder->setVarOffsetsAddExtraElement(
1168 writer->offsets_extra_element());
1169 query_builder->setVarOffsetsBitsize(writer->offsets_bitsize());
1170 RETURN_NOT_OK(writer_to_capnp(query, *writer, &builder));
1171 }
1172
1173 // Serialize Config
1174 const Config* config = query.config();
1175 auto config_builder = query_builder->initConfig();
1176 RETURN_NOT_OK(config_to_capnp(config, &config_builder));
1177
1178 // If stats object exists set its cap'n proto object
1179 stats::Stats* stats = query.stats();
1180 if (stats != nullptr) {
1181 auto stats_builder = query_builder->initStats();
1182 RETURN_NOT_OK(stats_to_capnp(*stats, &stats_builder));
1183 }
1184
1185 return Status::Ok();
1186 }
1187
query_from_capnp(const capnp::Query::Reader & query_reader,const SerializationContext context,void * buffer_start,CopyState * const copy_state,Query * const query,ThreadPool * compute_tp)1188 Status query_from_capnp(
1189 const capnp::Query::Reader& query_reader,
1190 const SerializationContext context,
1191 void* buffer_start,
1192 CopyState* const copy_state,
1193 Query* const query,
1194 ThreadPool* compute_tp) {
1195 using namespace tiledb::sm;
1196
1197 auto type = query->type();
1198 auto array = query->array();
1199
1200 const auto* schema = query->array_schema();
1201 if (schema == nullptr)
1202 return LOG_STATUS(Status::SerializationError(
1203 "Cannot deserialize; array schema is null."));
1204
1205 const auto* domain = schema->domain();
1206 if (domain == nullptr)
1207 return LOG_STATUS(Status::SerializationError(
1208 "Cannot deserialize; array domain is null."));
1209
1210 if (array == nullptr)
1211 return LOG_STATUS(Status::SerializationError(
1212 "Cannot deserialize; array pointer is null."));
1213
1214 // Deserialize query type (sanity check).
1215 QueryType query_type = QueryType::READ;
1216 RETURN_NOT_OK(query_type_enum(query_reader.getType().cStr(), &query_type));
1217 if (query_type != type)
1218 return LOG_STATUS(Status::SerializationError(
1219 "Cannot deserialize; Query opened for " + query_type_str(type) +
1220 " but got serialized type for " + query_reader.getType().cStr()));
1221
1222 // Deserialize layout.
1223 Layout layout = Layout::UNORDERED;
1224 RETURN_NOT_OK(layout_enum(query_reader.getLayout().cStr(), &layout));
1225 RETURN_NOT_OK(query->set_layout_unsafe(layout));
1226
1227 // Deserialize array instance.
1228 RETURN_NOT_OK(array_from_capnp(query_reader.getArray(), array));
1229
1230 // Deserialize Config
1231 if (query_reader.hasConfig()) {
1232 tdb_unique_ptr<Config> decoded_config = nullptr;
1233 auto config_reader = query_reader.getConfig();
1234 RETURN_NOT_OK(config_from_capnp(config_reader, &decoded_config));
1235 if (decoded_config != nullptr) {
1236 RETURN_NOT_OK(query->set_config(*decoded_config));
1237 }
1238 }
1239
1240 // Deserialize and set attribute buffers.
1241 if (!query_reader.hasAttributeBufferHeaders())
1242 return LOG_STATUS(Status::SerializationError(
1243 "Cannot deserialize; no attribute buffer headers in message."));
1244
1245 auto buffer_headers = query_reader.getAttributeBufferHeaders();
1246 auto attribute_buffer_start = static_cast<char*>(buffer_start);
1247 for (auto buffer_header : buffer_headers) {
1248 const std::string name = buffer_header.getName().cStr();
1249
1250 // Get buffer sizes required
1251 const uint64_t fixedlen_size = buffer_header.getFixedLenBufferSizeInBytes();
1252 const uint64_t varlen_size = buffer_header.getVarLenBufferSizeInBytes();
1253 const uint64_t validitylen_size =
1254 buffer_header.getValidityLenBufferSizeInBytes();
1255
1256 // Get current copy state for the attribute (contains destination offsets
1257 // for memcpy into user buffers).
1258 QueryBufferCopyState* attr_copy_state = nullptr;
1259 if (copy_state != nullptr)
1260 attr_copy_state = &(*copy_state)[name];
1261
1262 // Get any buffers already set on this query object.
1263 uint64_t* existing_offset_buffer = nullptr;
1264 uint64_t existing_offset_buffer_size = 0;
1265 void* existing_buffer = nullptr;
1266 uint64_t existing_buffer_size = 0;
1267 uint8_t* existing_validity_buffer = nullptr;
1268 uint64_t existing_validity_buffer_size = 0;
1269
1270 // For writes and read (client side) we need ptrs to set the sizes properly
1271 uint64_t* existing_buffer_size_ptr = nullptr;
1272 uint64_t* existing_offset_buffer_size_ptr = nullptr;
1273 uint64_t* existing_validity_buffer_size_ptr = nullptr;
1274
1275 auto var_size = schema->var_size(name);
1276 auto nullable = schema->is_nullable(name);
1277 if (type == QueryType::READ && context == SerializationContext::SERVER) {
1278 const QueryBuffer& query_buffer = query->buffer(name);
1279 // We use the query_buffer directly in order to get the original buffer
1280 // sizes This avoid a problem where an incomplete query will change the
1281 // users buffer size to the smaller results and we end up not being able
1282 // to correctly calculate if the new results can fit into the users buffer
1283 if (var_size) {
1284 if (!nullable) {
1285 existing_offset_buffer = static_cast<uint64_t*>(query_buffer.buffer_);
1286 existing_offset_buffer_size = query_buffer.original_buffer_size_;
1287 existing_buffer = query_buffer.buffer_var_;
1288 existing_buffer_size = query_buffer.original_buffer_var_size_;
1289 } else {
1290 existing_offset_buffer = static_cast<uint64_t*>(query_buffer.buffer_);
1291 existing_offset_buffer_size = query_buffer.original_buffer_size_;
1292 existing_buffer = query_buffer.buffer_var_;
1293 existing_buffer_size = query_buffer.original_buffer_var_size_;
1294 existing_validity_buffer = query_buffer.validity_vector_.buffer();
1295 existing_validity_buffer_size =
1296 query_buffer.original_validity_vector_size_;
1297 }
1298 } else {
1299 if (!nullable) {
1300 existing_buffer = query_buffer.buffer_;
1301 existing_buffer_size = query_buffer.original_buffer_size_;
1302 } else {
1303 existing_buffer = query_buffer.buffer_;
1304 existing_buffer_size = query_buffer.original_buffer_size_;
1305 existing_validity_buffer = query_buffer.validity_vector_.buffer();
1306 existing_validity_buffer_size =
1307 query_buffer.original_validity_vector_size_;
1308 }
1309 }
1310 } else {
1311 // For writes we need to use get_buffer and clientside
1312 if (var_size) {
1313 if (!nullable) {
1314 RETURN_NOT_OK(query->get_data_buffer(
1315 name.c_str(), &existing_buffer, &existing_buffer_size_ptr));
1316 RETURN_NOT_OK(query->get_offsets_buffer(
1317 name.c_str(),
1318 &existing_offset_buffer,
1319 &existing_offset_buffer_size_ptr));
1320
1321 if (existing_offset_buffer_size_ptr != nullptr)
1322 existing_offset_buffer_size = *existing_offset_buffer_size_ptr;
1323 if (existing_buffer_size_ptr != nullptr)
1324 existing_buffer_size = *existing_buffer_size_ptr;
1325 } else {
1326 RETURN_NOT_OK(query->get_data_buffer(
1327 name.c_str(), &existing_buffer, &existing_buffer_size_ptr));
1328 RETURN_NOT_OK(query->get_offsets_buffer(
1329 name.c_str(),
1330 &existing_offset_buffer,
1331 &existing_offset_buffer_size_ptr));
1332 RETURN_NOT_OK(query->get_validity_buffer(
1333 name.c_str(),
1334 &existing_validity_buffer,
1335 &existing_validity_buffer_size_ptr));
1336
1337 if (existing_offset_buffer_size_ptr != nullptr)
1338 existing_offset_buffer_size = *existing_offset_buffer_size_ptr;
1339 if (existing_buffer_size_ptr != nullptr)
1340 existing_buffer_size = *existing_buffer_size_ptr;
1341 if (existing_validity_buffer_size_ptr != nullptr)
1342 existing_validity_buffer_size = *existing_validity_buffer_size_ptr;
1343 }
1344 } else {
1345 if (!nullable) {
1346 RETURN_NOT_OK(query->get_data_buffer(
1347 name.c_str(), &existing_buffer, &existing_buffer_size_ptr));
1348
1349 if (existing_buffer_size_ptr != nullptr)
1350 existing_buffer_size = *existing_buffer_size_ptr;
1351 } else {
1352 RETURN_NOT_OK(query->get_data_buffer(
1353 name.c_str(), &existing_buffer, &existing_buffer_size_ptr));
1354 RETURN_NOT_OK(query->get_validity_buffer(
1355 name.c_str(),
1356 &existing_validity_buffer,
1357 &existing_validity_buffer_size_ptr));
1358
1359 if (existing_buffer_size_ptr != nullptr)
1360 existing_buffer_size = *existing_buffer_size_ptr;
1361 if (existing_validity_buffer_size_ptr != nullptr)
1362 existing_validity_buffer_size = *existing_validity_buffer_size_ptr;
1363 }
1364 }
1365 }
1366
1367 if (context == SerializationContext::CLIENT) {
1368 // For queries on the client side, we require that buffers have been
1369 // set by the user, and that they are large enough for all the serialized
1370 // data.
1371 const uint64_t curr_data_size =
1372 attr_copy_state == nullptr ? 0 : attr_copy_state->data_size;
1373 const uint64_t data_size_left = existing_buffer_size - curr_data_size;
1374 const uint64_t curr_offset_size =
1375 attr_copy_state == nullptr ? 0 : attr_copy_state->offset_size;
1376 const uint64_t offset_size_left =
1377 existing_offset_buffer_size == 0 ?
1378 0 :
1379 existing_offset_buffer_size - curr_offset_size;
1380 const uint64_t curr_validity_size =
1381 attr_copy_state == nullptr ? 0 : attr_copy_state->validity_size;
1382 const uint64_t validity_size_left =
1383 existing_validity_buffer_size == 0 ?
1384 0 :
1385 existing_validity_buffer_size - curr_validity_size;
1386
1387 const bool has_mem_for_data =
1388 (var_size && data_size_left >= varlen_size) ||
1389 (!var_size && data_size_left >= fixedlen_size);
1390 const bool has_mem_for_offset =
1391 (var_size && offset_size_left >= fixedlen_size) || !var_size;
1392 const bool has_mem_for_validity = validity_size_left >= validitylen_size;
1393 if (!has_mem_for_data || !has_mem_for_offset || !has_mem_for_validity) {
1394 return LOG_STATUS(Status::SerializationError(
1395 "Error deserializing read query; buffer too small for buffer "
1396 "'" +
1397 name + "'."));
1398 }
1399
1400 // For reads, copy the response data into user buffers. For writes,
1401 // nothing to do.
1402 if (type == QueryType::READ) {
1403 if (var_size) {
1404 // Var size attribute; buffers already set.
1405 char* offset_dest = (char*)existing_offset_buffer + curr_offset_size;
1406 char* data_dest = (char*)existing_buffer + curr_data_size;
1407 char* validity_dest =
1408 (char*)existing_validity_buffer + curr_validity_size;
1409 uint64_t fixedlen_size_to_copy = fixedlen_size;
1410
1411 // If the last query included an extra offset we will skip the first
1412 // offset in this query The first offset is the 0 position which ends
1413 // up the same as the extra offset in the last query 0th is converted
1414 // below to curr_data_size which is also the n+1 offset in arrow mode
1415 if (attr_copy_state != nullptr &&
1416 attr_copy_state->last_query_added_extra_offset) {
1417 attribute_buffer_start += sizeof(uint64_t);
1418 fixedlen_size_to_copy -= sizeof(uint64_t);
1419 }
1420
1421 std::memcpy(
1422 offset_dest, attribute_buffer_start, fixedlen_size_to_copy);
1423 attribute_buffer_start += fixedlen_size_to_copy;
1424 std::memcpy(data_dest, attribute_buffer_start, varlen_size);
1425 attribute_buffer_start += varlen_size;
1426 if (nullable) {
1427 std::memcpy(
1428 validity_dest, attribute_buffer_start, validitylen_size);
1429 attribute_buffer_start += validitylen_size;
1430 }
1431
1432 // The offsets in each buffer correspond to the values in its
1433 // data buffer. To build a single contigious buffer, we must
1434 // ensure the offset values continue in ascending order from the
1435 // previous buffer. For example, consider the following example
1436 // storing int32 values.
1437 //
1438 // Buffer #1:
1439 // offsets: [0, 8, 16]
1440 // values: [1, 2, 3, 4, 5, 6, 7]
1441 //
1442 // Buffer #2:
1443 // offsets: [0, 12]
1444 // values: [100, 200, 300, 400, 500]
1445 //
1446 // The final, contigious buffer will be:
1447 // offsets: [0, 8, 16, 28, 40]
1448 // values: [1, 2, 3, 4, 5, 6, 7, 100, 200, 300, 400, 500]
1449 //
1450 // The last two offsets, `28, 40` were calculated by adding `28`
1451 // to offsets of Buffer #2: [0, 12]. The `28` was calculated as
1452 // the byte size of the values in Buffer #1.
1453 if (curr_data_size > 0) {
1454 for (uint64_t i = 0; i < (fixedlen_size_to_copy / sizeof(uint64_t));
1455 ++i) {
1456 reinterpret_cast<uint64_t*>(offset_dest)[i] += curr_data_size;
1457 }
1458 }
1459
1460 if (attr_copy_state == nullptr) {
1461 // Set the size directly on the query (so user can introspect on
1462 // result size).
1463 if (existing_offset_buffer_size_ptr != nullptr)
1464 *existing_offset_buffer_size_ptr =
1465 curr_offset_size + fixedlen_size_to_copy;
1466 if (existing_buffer_size_ptr != nullptr)
1467 *existing_buffer_size_ptr = curr_data_size + varlen_size;
1468 if (nullable && existing_validity_buffer_size_ptr != nullptr)
1469 *existing_validity_buffer_size_ptr =
1470 curr_validity_size + validitylen_size;
1471 } else {
1472 // Accumulate total bytes copied (caller's responsibility to
1473 // eventually update the query).
1474 attr_copy_state->offset_size += fixedlen_size_to_copy;
1475 attr_copy_state->data_size += varlen_size;
1476 if (nullable)
1477 attr_copy_state->validity_size += validitylen_size;
1478
1479 // Set whether the extra offset was included or not
1480 attr_copy_state->last_query_added_extra_offset =
1481 query_reader.getVarOffsetsAddExtraElement();
1482 }
1483 } else {
1484 // Fixed size attribute; buffers already set.
1485 char* data_dest = (char*)existing_buffer + curr_data_size;
1486
1487 std::memcpy(data_dest, attribute_buffer_start, fixedlen_size);
1488 attribute_buffer_start += fixedlen_size;
1489 if (nullable) {
1490 char* validity_dest =
1491 (char*)existing_validity_buffer + curr_validity_size;
1492 std::memcpy(
1493 validity_dest, attribute_buffer_start, validitylen_size);
1494 attribute_buffer_start += validitylen_size;
1495 }
1496
1497 if (attr_copy_state == nullptr) {
1498 if (existing_buffer_size_ptr != nullptr)
1499 *existing_buffer_size_ptr = curr_data_size + fixedlen_size;
1500 if (nullable && existing_validity_buffer_size_ptr != nullptr)
1501 *existing_validity_buffer_size_ptr =
1502 curr_validity_size + validitylen_size;
1503 } else {
1504 attr_copy_state->data_size += fixedlen_size;
1505 if (nullable)
1506 attr_copy_state->validity_size += validitylen_size;
1507 }
1508 }
1509 }
1510 } else if (context == SerializationContext::SERVER) {
1511 // Always expect null buffers when deserializing.
1512 if (existing_buffer != nullptr || existing_offset_buffer != nullptr ||
1513 existing_validity_buffer != nullptr)
1514 return LOG_STATUS(Status::SerializationError(
1515 "Error deserializing read query; unexpected "
1516 "buffer set on server-side."));
1517
1518 Query::SerializationState::AttrState* attr_state;
1519 RETURN_NOT_OK(query->get_attr_serialization_state(name, &attr_state));
1520 if (type == QueryType::READ) {
1521 // On reads, just set null pointers with accurate size so that the
1522 // server can introspect and allocate properly sized buffers separately.
1523 Buffer offsets_buff(nullptr, fixedlen_size);
1524 Buffer varlen_buff(nullptr, varlen_size);
1525 Buffer validitylen_buff(nullptr, validitylen_size);
1526 // For the server on reads we want to set the original user requested
1527 // buffer sizes This handles the case of incomplete queries where on the
1528 // second `submit()` call the client's buffer size will be the first
1529 // submit's result size not the original user set buffer size. To work
1530 // around this we revert the server to always use the full original user
1531 // requested buffer sizes.
1532 // We check for > 0 for fallback for clients older than 2.2.5
1533 if (buffer_header.getOriginalFixedLenBufferSizeInBytes() > 0) {
1534 attr_state->fixed_len_size =
1535 buffer_header.getOriginalFixedLenBufferSizeInBytes();
1536 } else {
1537 attr_state->fixed_len_size =
1538 buffer_header.getFixedLenBufferSizeInBytes();
1539 }
1540
1541 if (buffer_header.getOriginalVarLenBufferSizeInBytes() > 0) {
1542 attr_state->var_len_size =
1543 buffer_header.getOriginalVarLenBufferSizeInBytes();
1544 } else {
1545 attr_state->var_len_size = buffer_header.getVarLenBufferSizeInBytes();
1546 }
1547
1548 if (buffer_header.getOriginalValidityLenBufferSizeInBytes() > 0) {
1549 attr_state->validity_len_size =
1550 buffer_header.getOriginalValidityLenBufferSizeInBytes();
1551 } else {
1552 attr_state->validity_len_size =
1553 buffer_header.getValidityLenBufferSizeInBytes();
1554 }
1555
1556 attr_state->fixed_len_data.swap(offsets_buff);
1557 attr_state->var_len_data.swap(varlen_buff);
1558 attr_state->validity_len_data.swap(validitylen_buff);
1559 if (var_size) {
1560 if (!nullable) {
1561 RETURN_NOT_OK(query->set_data_buffer(
1562 name, nullptr, &attr_state->var_len_size, false));
1563 RETURN_NOT_OK(query->set_offsets_buffer(
1564 name, nullptr, &attr_state->fixed_len_size, false));
1565 } else {
1566 RETURN_NOT_OK(query->set_buffer_vbytemap(
1567 name,
1568 nullptr,
1569 &attr_state->fixed_len_size,
1570 nullptr,
1571 &attr_state->var_len_size,
1572 nullptr,
1573 &attr_state->validity_len_size,
1574 false));
1575 }
1576 } else {
1577 if (!nullable) {
1578 RETURN_NOT_OK(query->set_data_buffer(
1579 name, nullptr, &attr_state->fixed_len_size, false));
1580 } else {
1581 RETURN_NOT_OK(query->set_buffer_vbytemap(
1582 name,
1583 nullptr,
1584 &attr_state->fixed_len_size,
1585 nullptr,
1586 &attr_state->validity_len_size,
1587 false));
1588 }
1589 }
1590 } else {
1591 // On writes, just set buffer pointers wrapping the data in the message.
1592 if (var_size) {
1593 auto* offsets = reinterpret_cast<uint64_t*>(attribute_buffer_start);
1594 auto* varlen_data = attribute_buffer_start + fixedlen_size;
1595 auto* validity = reinterpret_cast<uint8_t*>(
1596 attribute_buffer_start + fixedlen_size + varlen_size);
1597
1598 attribute_buffer_start +=
1599 fixedlen_size + varlen_size + validitylen_size;
1600
1601 Buffer offsets_buff(offsets, fixedlen_size);
1602 Buffer varlen_buff(varlen_data, varlen_size);
1603 Buffer validity_buff(
1604 validitylen_size > 0 ? validity : nullptr, validitylen_size);
1605
1606 attr_state->fixed_len_size = fixedlen_size;
1607 attr_state->var_len_size = varlen_size;
1608 attr_state->validity_len_size = validitylen_size;
1609
1610 attr_state->fixed_len_data.swap(offsets_buff);
1611 attr_state->var_len_data.swap(varlen_buff);
1612 attr_state->validity_len_data.swap(validity_buff);
1613
1614 if (!nullable) {
1615 RETURN_NOT_OK(query->set_data_buffer(
1616 name, varlen_data, &attr_state->var_len_size));
1617 RETURN_NOT_OK(query->set_offsets_buffer(
1618 name, offsets, &attr_state->fixed_len_size));
1619 } else {
1620 RETURN_NOT_OK(query->set_buffer_vbytemap(
1621 name,
1622 offsets,
1623 &attr_state->fixed_len_size,
1624 varlen_data,
1625 &attr_state->var_len_size,
1626 validity,
1627 &attr_state->validity_len_size));
1628 }
1629 } else {
1630 auto* data = attribute_buffer_start;
1631 auto* validity = reinterpret_cast<uint8_t*>(
1632 attribute_buffer_start + fixedlen_size);
1633
1634 attribute_buffer_start += fixedlen_size + validitylen_size;
1635
1636 Buffer buff(data, fixedlen_size);
1637 Buffer varlen_buff(nullptr, 0);
1638 Buffer validity_buff(
1639 validitylen_size > 0 ? validity : nullptr, validitylen_size);
1640
1641 attr_state->fixed_len_size = fixedlen_size;
1642 attr_state->var_len_size = varlen_size;
1643 attr_state->validity_len_size = validitylen_size;
1644
1645 attr_state->fixed_len_data.swap(buff);
1646 attr_state->var_len_data.swap(varlen_buff);
1647 attr_state->validity_len_data.swap(validity_buff);
1648
1649 if (!nullable) {
1650 RETURN_NOT_OK(query->set_data_buffer(
1651 name, data, &attr_state->fixed_len_size));
1652 } else {
1653 RETURN_NOT_OK(query->set_buffer_vbytemap(
1654 name,
1655 data,
1656 &attr_state->fixed_len_size,
1657 validity,
1658 &attr_state->validity_len_size));
1659 }
1660 }
1661 }
1662 }
1663 }
1664
1665 // Deserialize reader/writer.
1666 // Also set subarray on query if it exists. Prior to 1.8 the subarray was set
1667 // on the reader or writer directly Now we set it on the query class after the
1668 // heterogeneous coordinate changes
1669 if (type == QueryType::READ) {
1670 if (query_reader.hasReaderIndex() && !schema->dense() &&
1671 layout == Layout::GLOBAL_ORDER) {
1672 // Strategy needs to be cleared here to create the correct reader.
1673 query->clear_strategy();
1674 RETURN_NOT_OK(query->set_layout_unsafe(layout));
1675
1676 auto reader_reader = query_reader.getReaderIndex();
1677 auto reader = (SparseGlobalOrderReader*)query->strategy();
1678
1679 if (query_reader.hasVarOffsetsMode()) {
1680 RETURN_NOT_OK(
1681 reader->set_offsets_mode(query_reader.getVarOffsetsMode()));
1682 }
1683
1684 RETURN_NOT_OK(reader->set_offsets_extra_element(
1685 query_reader.getVarOffsetsAddExtraElement()));
1686
1687 if (query_reader.getVarOffsetsBitsize() > 0) {
1688 RETURN_NOT_OK(
1689 reader->set_offsets_bitsize(query_reader.getVarOffsetsBitsize()));
1690 }
1691
1692 RETURN_NOT_OK(reader->initialize_memory_budget());
1693
1694 RETURN_NOT_OK(
1695 index_reader_from_capnp(schema, reader_reader, query, reader));
1696 } else if (
1697 query_reader.hasReaderIndex() && !schema->dense() &&
1698 layout == Layout::UNORDERED && schema->allows_dups()) {
1699 // Strategy needs to be cleared here to create the correct reader.
1700 query->clear_strategy();
1701 RETURN_NOT_OK(query->set_layout_unsafe(layout));
1702
1703 auto reader_reader = query_reader.getReaderIndex();
1704 auto reader = (SparseUnorderedWithDupsReader*)query->strategy();
1705
1706 if (query_reader.hasVarOffsetsMode()) {
1707 RETURN_NOT_OK(
1708 reader->set_offsets_mode(query_reader.getVarOffsetsMode()));
1709 }
1710
1711 RETURN_NOT_OK(reader->set_offsets_extra_element(
1712 query_reader.getVarOffsetsAddExtraElement()));
1713
1714 if (query_reader.getVarOffsetsBitsize() > 0) {
1715 RETURN_NOT_OK(
1716 reader->set_offsets_bitsize(query_reader.getVarOffsetsBitsize()));
1717 }
1718
1719 RETURN_NOT_OK(reader->initialize_memory_budget());
1720
1721 RETURN_NOT_OK(
1722 index_reader_from_capnp(schema, reader_reader, query, reader));
1723 } else if (query_reader.hasDenseReader()) {
1724 // Strategy needs to be cleared here to create the correct reader.
1725 query->clear_strategy();
1726 RETURN_NOT_OK(query->set_layout_unsafe(layout));
1727
1728 auto reader_reader = query_reader.getDenseReader();
1729 auto reader = (DenseReader*)query->strategy();
1730
1731 if (query_reader.hasVarOffsetsMode()) {
1732 RETURN_NOT_OK(
1733 reader->set_offsets_mode(query_reader.getVarOffsetsMode()));
1734 }
1735
1736 RETURN_NOT_OK(reader->set_offsets_extra_element(
1737 query_reader.getVarOffsetsAddExtraElement()));
1738
1739 if (query_reader.getVarOffsetsBitsize() > 0) {
1740 RETURN_NOT_OK(
1741 reader->set_offsets_bitsize(query_reader.getVarOffsetsBitsize()));
1742 }
1743
1744 RETURN_NOT_OK(dense_reader_from_capnp(
1745 schema, reader_reader, query, reader, compute_tp));
1746 } else {
1747 auto reader_reader = query_reader.getReader();
1748 auto reader = (Reader*)query->strategy();
1749
1750 if (query_reader.hasVarOffsetsMode()) {
1751 RETURN_NOT_OK(
1752 reader->set_offsets_mode(query_reader.getVarOffsetsMode()));
1753 }
1754
1755 RETURN_NOT_OK(reader->set_offsets_extra_element(
1756 query_reader.getVarOffsetsAddExtraElement()));
1757
1758 if (query_reader.getVarOffsetsBitsize() > 0) {
1759 RETURN_NOT_OK(
1760 reader->set_offsets_bitsize(query_reader.getVarOffsetsBitsize()));
1761 }
1762
1763 RETURN_NOT_OK(
1764 reader_from_capnp(reader_reader, query, reader, compute_tp));
1765 }
1766 } else {
1767 auto writer_reader = query_reader.getWriter();
1768 auto writer = (Writer*)query->strategy();
1769
1770 if (query_reader.hasVarOffsetsMode()) {
1771 RETURN_NOT_OK(writer->set_offsets_mode(query_reader.getVarOffsetsMode()));
1772 }
1773
1774 RETURN_NOT_OK(writer->set_offsets_extra_element(
1775 query_reader.getVarOffsetsAddExtraElement()));
1776
1777 if (query_reader.getVarOffsetsBitsize() > 0) {
1778 RETURN_NOT_OK(
1779 writer->set_offsets_bitsize(query_reader.getVarOffsetsBitsize()));
1780 }
1781
1782 RETURN_NOT_OK(writer_from_capnp(writer_reader, writer));
1783
1784 // For sparse writes we want to explicitly set subarray to nullptr.
1785 const bool sparse_write =
1786 !schema->dense() || query->layout() == Layout::UNORDERED;
1787 if (!sparse_write) {
1788 if (writer_reader.hasSubarray()) {
1789 auto subarray_reader = writer_reader.getSubarray();
1790 void* subarray = nullptr;
1791 RETURN_NOT_OK(
1792 utils::deserialize_subarray(subarray_reader, schema, &subarray));
1793 RETURN_NOT_OK_ELSE(query->set_subarray(subarray), tdb_free(subarray));
1794 tdb_free(subarray);
1795 }
1796
1797 // Subarray
1798 if (writer_reader.hasSubarrayRanges()) {
1799 Subarray subarray(array, layout, writer->stats(), dummy_logger, false);
1800 auto subarray_reader = writer_reader.getSubarrayRanges();
1801 RETURN_NOT_OK(subarray_from_capnp(subarray_reader, &subarray));
1802 RETURN_NOT_OK(query->set_subarray_unsafe(subarray));
1803 }
1804 }
1805 }
1806
1807 // Deserialize status. This must come last because various setters above
1808 // will reset it.
1809 QueryStatus query_status = QueryStatus::UNINITIALIZED;
1810 RETURN_NOT_OK(
1811 query_status_enum(query_reader.getStatus().cStr(), &query_status));
1812 query->set_status(query_status);
1813
1814 // If cap'n proto object has stats set it on c++ object
1815 if (query_reader.hasStats()) {
1816 stats::Stats* stats = query->stats();
1817 // We should always have a stats here
1818 if (stats != nullptr) {
1819 RETURN_NOT_OK(stats_from_capnp(query_reader.getStats(), stats));
1820 }
1821 }
1822
1823 return Status::Ok();
1824 }
1825
query_serialize(Query * query,SerializationType serialize_type,bool clientside,BufferList * serialized_buffer)1826 Status query_serialize(
1827 Query* query,
1828 SerializationType serialize_type,
1829 bool clientside,
1830 BufferList* serialized_buffer) {
1831 if (serialize_type == SerializationType::JSON)
1832 return LOG_STATUS(Status::SerializationError(
1833 "Cannot serialize query; json format not supported."));
1834
1835 const auto* array_schema = query->array_schema();
1836 if (array_schema == nullptr || query->array() == nullptr)
1837 return LOG_STATUS(Status::SerializationError(
1838 "Cannot serialize; array or array schema is null."));
1839
1840 try {
1841 ::capnp::MallocMessageBuilder message;
1842 capnp::Query::Builder query_builder = message.initRoot<capnp::Query>();
1843 RETURN_NOT_OK(query_to_capnp(*query, &query_builder));
1844
1845 // Determine whether we should be serializing the buffer data.
1846 const bool serialize_buffers =
1847 (clientside && query->type() == QueryType::WRITE) ||
1848 (!clientside && query->type() == QueryType::READ);
1849
1850 switch (serialize_type) {
1851 case SerializationType::JSON: {
1852 ::capnp::JsonCodec json;
1853 kj::String capnp_json = json.encode(query_builder);
1854 const auto json_len = capnp_json.size();
1855 const char nul = '\0';
1856 Buffer header;
1857 // size does not include needed null terminator, so add +1
1858 RETURN_NOT_OK(header.realloc(json_len + 1));
1859 RETURN_NOT_OK(header.write(capnp_json.cStr(), json_len));
1860 RETURN_NOT_OK(header.write(&nul, 1));
1861 RETURN_NOT_OK(serialized_buffer->add_buffer(std::move(header)));
1862 // TODO: At this point the buffer data should also be serialized.
1863 break;
1864 }
1865 case SerializationType::CAPNP: {
1866 kj::Array<::capnp::word> protomessage = messageToFlatArray(message);
1867 kj::ArrayPtr<const char> message_chars = protomessage.asChars();
1868
1869 // Write the serialized query
1870 Buffer header;
1871 RETURN_NOT_OK(header.realloc(message_chars.size()));
1872 RETURN_NOT_OK(
1873 header.write(message_chars.begin(), message_chars.size()));
1874 RETURN_NOT_OK(serialized_buffer->add_buffer(std::move(header)));
1875
1876 // Concatenate buffers to end of message
1877 if (serialize_buffers) {
1878 auto attr_buffer_builders = query_builder.getAttributeBufferHeaders();
1879 for (auto attr_buffer_builder : attr_buffer_builders) {
1880 const std::string name = attr_buffer_builder.getName().cStr();
1881
1882 auto query_buffer = query->buffer(name);
1883
1884 if (query_buffer.buffer_var_size_ != nullptr &&
1885 query_buffer.buffer_var_ != nullptr) {
1886 Buffer offsets(query_buffer.buffer_, *query_buffer.buffer_size_);
1887 RETURN_NOT_OK(serialized_buffer->add_buffer(std::move(offsets)));
1888 Buffer data(
1889 query_buffer.buffer_var_, *query_buffer.buffer_var_size_);
1890 RETURN_NOT_OK(serialized_buffer->add_buffer(std::move(data)));
1891 } else if (
1892 query_buffer.buffer_size_ != nullptr &&
1893 query_buffer.buffer_ != nullptr) {
1894 Buffer data(query_buffer.buffer_, *query_buffer.buffer_size_);
1895 RETURN_NOT_OK(serialized_buffer->add_buffer(std::move(data)));
1896 } else {
1897 assert(false);
1898 }
1899
1900 if (query_buffer.validity_vector_.buffer_size() != nullptr) {
1901 Buffer validity(
1902 query_buffer.validity_vector_.buffer(),
1903 *query_buffer.validity_vector_.buffer_size());
1904 RETURN_NOT_OK(serialized_buffer->add_buffer(std::move(validity)));
1905 }
1906 }
1907 }
1908
1909 break;
1910 }
1911 default:
1912 return LOG_STATUS(Status::SerializationError(
1913 "Cannot serialize; unknown serialization type"));
1914 }
1915 } catch (kj::Exception& e) {
1916 return LOG_STATUS(Status::SerializationError(
1917 "Cannot serialize; kj::Exception: " +
1918 std::string(e.getDescription().cStr())));
1919 } catch (std::exception& e) {
1920 return LOG_STATUS(Status::SerializationError(
1921 "Cannot serialize; exception: " + std::string(e.what())));
1922 }
1923
1924 return Status::Ok();
1925 }
1926
do_query_deserialize(const Buffer & serialized_buffer,SerializationType serialize_type,const SerializationContext context,CopyState * const copy_state,Query * query,ThreadPool * compute_tp)1927 Status do_query_deserialize(
1928 const Buffer& serialized_buffer,
1929 SerializationType serialize_type,
1930 const SerializationContext context,
1931 CopyState* const copy_state,
1932 Query* query,
1933 ThreadPool* compute_tp) {
1934 if (serialize_type == SerializationType::JSON)
1935 return LOG_STATUS(Status::SerializationError(
1936 "Cannot deserialize query; json format not supported."));
1937
1938 try {
1939 switch (serialize_type) {
1940 case SerializationType::JSON: {
1941 ::capnp::JsonCodec json;
1942 ::capnp::MallocMessageBuilder message_builder;
1943 capnp::Query::Builder query_builder =
1944 message_builder.initRoot<capnp::Query>();
1945 json.decode(
1946 kj::StringPtr(
1947 static_cast<const char*>(serialized_buffer.cur_data())),
1948 query_builder);
1949 capnp::Query::Reader query_reader = query_builder.asReader();
1950 return query_from_capnp(
1951 query_reader, context, nullptr, copy_state, query, compute_tp);
1952 }
1953 case SerializationType::CAPNP: {
1954 // Capnp FlatArrayMessageReader requires 64-bit alignment.
1955 if (!utils::is_aligned<sizeof(uint64_t)>(serialized_buffer.cur_data()))
1956 return LOG_STATUS(Status::SerializationError(
1957 "Could not deserialize query; buffer is not 8-byte aligned."));
1958
1959 // Set traversal limit to 10GI (TODO: make this a config option)
1960 ::capnp::ReaderOptions readerOptions;
1961 readerOptions.traversalLimitInWords = uint64_t(1024) * 1024 * 1024 * 10;
1962 ::capnp::FlatArrayMessageReader reader(
1963 kj::arrayPtr(
1964 reinterpret_cast<const ::capnp::word*>(
1965 serialized_buffer.cur_data()),
1966 (serialized_buffer.size() - serialized_buffer.offset()) /
1967 sizeof(::capnp::word)),
1968 readerOptions);
1969
1970 capnp::Query::Reader query_reader = reader.getRoot<capnp::Query>();
1971
1972 // Get a pointer to the start of the attribute buffer data (which
1973 // was concatenated after the CapnP message on serialization).
1974 auto attribute_buffer_start = reader.getEnd();
1975 auto buffer_start = const_cast<::capnp::word*>(attribute_buffer_start);
1976 return query_from_capnp(
1977 query_reader, context, buffer_start, copy_state, query, compute_tp);
1978 }
1979 default:
1980 return LOG_STATUS(Status::SerializationError(
1981 "Cannot deserialize; unknown serialization type."));
1982 }
1983 } catch (kj::Exception& e) {
1984 return LOG_STATUS(Status::SerializationError(
1985 "Cannot deserialize; kj::Exception: " +
1986 std::string(e.getDescription().cStr())));
1987 } catch (std::exception& e) {
1988 return LOG_STATUS(Status::SerializationError(
1989 "Cannot deserialize; exception: " + std::string(e.what())));
1990 }
1991 return Status::Ok();
1992 }
1993
query_deserialize(const Buffer & serialized_buffer,SerializationType serialize_type,bool clientside,CopyState * copy_state,Query * query,ThreadPool * compute_tp)1994 Status query_deserialize(
1995 const Buffer& serialized_buffer,
1996 SerializationType serialize_type,
1997 bool clientside,
1998 CopyState* copy_state,
1999 Query* query,
2000 ThreadPool* compute_tp) {
2001 // Create an original, serialized copy of the 'query' that we will revert
2002 // to if we are unable to deserialize 'serialized_buffer'.
2003 BufferList original_bufferlist;
2004 RETURN_NOT_OK(
2005 query_serialize(query, serialize_type, clientside, &original_bufferlist));
2006
2007 // The first buffer is always the serialized Query object.
2008 tiledb::sm::Buffer* original_buffer;
2009 RETURN_NOT_OK(original_bufferlist.get_buffer(0, &original_buffer));
2010 original_buffer->reset_offset();
2011
2012 // Similarly, we must create a copy of 'copy_state'.
2013 tdb_unique_ptr<CopyState> original_copy_state = nullptr;
2014 if (copy_state) {
2015 original_copy_state =
2016 tdb_unique_ptr<CopyState>(tdb_new(CopyState, *copy_state));
2017 }
2018
2019 // Deserialize 'serialized_buffer'.
2020 const Status st = do_query_deserialize(
2021 serialized_buffer,
2022 serialize_type,
2023 clientside ? SerializationContext::CLIENT : SerializationContext::SERVER,
2024 copy_state,
2025 query,
2026 compute_tp);
2027
2028 // If the deserialization failed, deserialize 'serialized_query_original'
2029 // into 'query' to ensure that 'query' is in the state it was before the
2030 // deserialization of 'serialized_buffer' failed.
2031 if (!st.ok()) {
2032 if (original_copy_state) {
2033 *copy_state = *original_copy_state;
2034 } else {
2035 copy_state = NULL;
2036 }
2037
2038 const Status st2 = do_query_deserialize(
2039 *original_buffer,
2040 serialize_type,
2041 SerializationContext::BACKUP,
2042 copy_state,
2043 query,
2044 compute_tp);
2045 if (!st2.ok()) {
2046 LOG_FATAL(st2.message());
2047 }
2048 }
2049
2050 return st;
2051 }
2052
query_est_result_size_reader_to_capnp(Query & query,capnp::EstimatedResultSize::Builder * est_result_size_builder)2053 Status query_est_result_size_reader_to_capnp(
2054 Query& query,
2055 capnp::EstimatedResultSize::Builder* est_result_size_builder) {
2056 using namespace tiledb::sm;
2057
2058 auto est_buffer_size_map = query.get_est_result_size_map();
2059 auto max_mem_size_map = query.get_max_mem_size_map();
2060
2061 auto result_sizes_builder = est_result_size_builder->initResultSizes();
2062 auto result_sizes_builder_entries =
2063 result_sizes_builder.initEntries(est_buffer_size_map.size());
2064 int i = 0;
2065 for (auto& it : est_buffer_size_map) {
2066 auto range_builder = result_sizes_builder_entries[i];
2067 range_builder.setKey(it.first);
2068 capnp::EstimatedResultSize::ResultSize::Builder result_size_builder =
2069 range_builder.initValue();
2070 result_size_builder.setSizeFixed(it.second.size_fixed_);
2071 result_size_builder.setSizeVar(it.second.size_var_);
2072 result_size_builder.setSizeValidity(it.second.size_validity_);
2073 ++i;
2074 }
2075
2076 auto memory_sizes_builder = est_result_size_builder->initMemorySizes();
2077 auto memory_sizes_builder_entries =
2078 memory_sizes_builder.initEntries(est_buffer_size_map.size());
2079 i = 0;
2080 for (auto& it : max_mem_size_map) {
2081 auto range_builder = memory_sizes_builder_entries[i];
2082 range_builder.setKey(it.first);
2083 capnp::EstimatedResultSize::MemorySize::Builder result_size_builder =
2084 range_builder.initValue();
2085 result_size_builder.setSizeFixed(it.second.size_fixed_);
2086 result_size_builder.setSizeVar(it.second.size_var_);
2087 result_size_builder.setSizeValidity(it.second.size_validity_);
2088 ++i;
2089 }
2090
2091 return Status::Ok();
2092 }
2093
query_est_result_size_reader_from_capnp(const capnp::EstimatedResultSize::Reader & est_result_size_reader,Query * const query)2094 Status query_est_result_size_reader_from_capnp(
2095 const capnp::EstimatedResultSize::Reader& est_result_size_reader,
2096 Query* const query) {
2097 using namespace tiledb::sm;
2098
2099 auto est_result_sizes = est_result_size_reader.getResultSizes();
2100 auto max_memory_sizes = est_result_size_reader.getMemorySizes();
2101
2102 std::unordered_map<std::string, Subarray::ResultSize> est_result_sizes_map;
2103 for (auto it : est_result_sizes.getEntries()) {
2104 std::string name = it.getKey();
2105 auto result_size = it.getValue();
2106 est_result_sizes_map.emplace(
2107 name,
2108 Subarray::ResultSize{result_size.getSizeFixed(),
2109 result_size.getSizeVar(),
2110 result_size.getSizeValidity()});
2111 }
2112
2113 std::unordered_map<std::string, Subarray::MemorySize> max_memory_sizes_map;
2114 for (auto it : max_memory_sizes.getEntries()) {
2115 std::string name = it.getKey();
2116 auto memory_size = it.getValue();
2117 max_memory_sizes_map.emplace(
2118 name,
2119 Subarray::MemorySize{memory_size.getSizeFixed(),
2120 memory_size.getSizeVar(),
2121 memory_size.getSizeValidity()});
2122 }
2123
2124 return query->set_est_result_size(est_result_sizes_map, max_memory_sizes_map);
2125 }
2126
query_est_result_size_serialize(Query * query,SerializationType serialize_type,bool clientside,Buffer * serialized_buffer)2127 Status query_est_result_size_serialize(
2128 Query* query,
2129 SerializationType serialize_type,
2130 bool clientside,
2131 Buffer* serialized_buffer) {
2132 (void)clientside;
2133 try {
2134 ::capnp::MallocMessageBuilder message;
2135 capnp::EstimatedResultSize::Builder est_result_size_builder =
2136 message.initRoot<capnp::EstimatedResultSize>();
2137 RETURN_NOT_OK(query_est_result_size_reader_to_capnp(
2138 *query, &est_result_size_builder));
2139
2140 switch (serialize_type) {
2141 case SerializationType::JSON: {
2142 ::capnp::JsonCodec json;
2143 kj::String capnp_json = json.encode(est_result_size_builder);
2144 const auto json_len = capnp_json.size();
2145 const char nul = '\0';
2146 // size does not include needed null terminator, so add +1
2147 RETURN_NOT_OK(serialized_buffer->realloc(json_len + 1));
2148 RETURN_NOT_OK(serialized_buffer->write(capnp_json.cStr(), json_len));
2149 RETURN_NOT_OK(serialized_buffer->write(&nul, 1));
2150 break;
2151 break;
2152 }
2153 case SerializationType::CAPNP: {
2154 kj::Array<::capnp::word> protomessage = messageToFlatArray(message);
2155 kj::ArrayPtr<const char> message_chars = protomessage.asChars();
2156
2157 // Write the serialized query estimated results
2158 const auto nbytes = message_chars.size();
2159 RETURN_NOT_OK(serialized_buffer->realloc(nbytes));
2160 RETURN_NOT_OK(serialized_buffer->write(message_chars.begin(), nbytes));
2161 break;
2162 }
2163 default:
2164 return LOG_STATUS(Status::SerializationError(
2165 "Cannot serialize; unknown serialization type"));
2166 }
2167 } catch (kj::Exception& e) {
2168 return LOG_STATUS(Status::SerializationError(
2169 "Cannot serialize; kj::Exception: " +
2170 std::string(e.getDescription().cStr())));
2171 } catch (std::exception& e) {
2172 return LOG_STATUS(Status::SerializationError(
2173 "Cannot serialize; exception: " + std::string(e.what())));
2174 }
2175
2176 return Status::Ok();
2177 }
2178
query_est_result_size_deserialize(Query * query,SerializationType serialize_type,bool clientside,const Buffer & serialized_buffer)2179 Status query_est_result_size_deserialize(
2180 Query* query,
2181 SerializationType serialize_type,
2182 bool clientside,
2183 const Buffer& serialized_buffer) {
2184 (void)clientside;
2185 try {
2186 switch (serialize_type) {
2187 case SerializationType::JSON: {
2188 ::capnp::JsonCodec json;
2189 ::capnp::MallocMessageBuilder message_builder;
2190 capnp::EstimatedResultSize::Builder estimated_result_size_builder =
2191 message_builder.initRoot<capnp::EstimatedResultSize>();
2192 json.decode(
2193 kj::StringPtr(static_cast<const char*>(serialized_buffer.data())),
2194 estimated_result_size_builder);
2195 capnp::EstimatedResultSize::Reader estimated_result_size_reader =
2196 estimated_result_size_builder.asReader();
2197 RETURN_NOT_OK(query_est_result_size_reader_from_capnp(
2198 estimated_result_size_reader, query));
2199 break;
2200 }
2201 case SerializationType::CAPNP: {
2202 const auto mBytes =
2203 reinterpret_cast<const kj::byte*>(serialized_buffer.data());
2204 ::capnp::FlatArrayMessageReader reader(kj::arrayPtr(
2205 reinterpret_cast<const ::capnp::word*>(mBytes),
2206 serialized_buffer.size() / sizeof(::capnp::word)));
2207 capnp::EstimatedResultSize::Reader estimated_result_size_reader =
2208 reader.getRoot<capnp::EstimatedResultSize>();
2209 RETURN_NOT_OK(query_est_result_size_reader_from_capnp(
2210 estimated_result_size_reader, query));
2211 break;
2212 }
2213 default: {
2214 return LOG_STATUS(
2215 Status::SerializationError("Error deserializing query est result "
2216 "size; Unknown serialization type "
2217 "passed"));
2218 }
2219 }
2220 } catch (kj::Exception& e) {
2221 return LOG_STATUS(Status::SerializationError(
2222 "Error deserializing query est result size; kj::Exception: " +
2223 std::string(e.getDescription().cStr())));
2224 } catch (std::exception& e) {
2225 return LOG_STATUS(Status::SerializationError(
2226 "Error deserializing query est result size; exception " +
2227 std::string(e.what())));
2228 }
2229
2230 return Status::Ok();
2231 }
2232
2233 #else
2234
2235 Status query_serialize(Query*, SerializationType, bool, BufferList*) {
2236 return LOG_STATUS(Status::SerializationError(
2237 "Cannot serialize; serialization not enabled."));
2238 }
2239
2240 Status query_deserialize(
2241 const Buffer&, SerializationType, bool, CopyState*, Query*, ThreadPool*) {
2242 return LOG_STATUS(Status::SerializationError(
2243 "Cannot deserialize; serialization not enabled."));
2244 }
2245
2246 Status query_est_result_size_serialize(
2247 Query*, SerializationType, bool, Buffer*) {
2248 return LOG_STATUS(Status::SerializationError(
2249 "Cannot serialize; serialization not enabled."));
2250 }
2251
2252 Status query_est_result_size_deserialize(
2253 Query*, SerializationType, bool, const Buffer&) {
2254 return LOG_STATUS(Status::SerializationError(
2255 "Cannot deserialize; serialization not enabled."));
2256 }
2257
2258 #endif // TILEDB_SERIALIZATION
2259
2260 } // namespace serialization
2261 } // namespace sm
2262 } // namespace tiledb
2263