1 /**
2 * @file rest_client.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2018-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file declares a REST client class.
31 */
32
33 // clang-format off
34 #ifdef TILEDB_SERIALIZATION
35 #include "tiledb/sm/serialization/capnp_utils.h"
36 #include "tiledb/sm/serialization/array_schema_evolution.h"
37 #include "tiledb/sm/serialization/query.h"
38 #include "tiledb/sm/serialization/tiledb-rest.h"
39 #include "tiledb/sm/rest/curl.h" // must be included last to avoid Windows.h
40 #endif
41 // clang-format on
42
43 // Something, somewhere seems to be defining TIME_MS as a macro
44 #if defined(TIME_MS)
45 #undef TIME_MS
46 #endif
47
48 #include <cassert>
49
50 #include "tiledb/common/logger.h"
51 #include "tiledb/sm/array/array.h"
52 #include "tiledb/sm/enums/query_type.h"
53 #include "tiledb/sm/misc/constants.h"
54 #include "tiledb/sm/misc/utils.h"
55 #include "tiledb/sm/query/query.h"
56 #include "tiledb/sm/rest/rest_client.h"
57 #include "tiledb/sm/serialization/array_schema.h"
58
59 using namespace tiledb::common;
60
61 namespace tiledb {
62 namespace sm {
63
64 #ifdef TILEDB_SERIALIZATION
65
RestClient()66 RestClient::RestClient()
67 : stats_(nullptr)
68 , config_(nullptr)
69 , compute_tp_(nullptr)
70 , resubmit_incomplete_(true) {
71 auto st = utils::parse::convert(
72 Config::REST_SERIALIZATION_DEFAULT_FORMAT, &serialization_type_);
73 assert(st.ok());
74 (void)st;
75 }
76
init(stats::Stats * const parent_stats,const Config * config,ThreadPool * compute_tp)77 Status RestClient::init(
78 stats::Stats* const parent_stats,
79 const Config* config,
80 ThreadPool* compute_tp) {
81 if (config == nullptr)
82 return LOG_STATUS(
83 Status::RestError("Error initializing rest client; config is null."));
84
85 stats_ = parent_stats->create_child("RestClient");
86
87 config_ = config;
88 compute_tp_ = compute_tp;
89
90 const char* c_str;
91 RETURN_NOT_OK(config_->get("rest.server_address", &c_str));
92 if (c_str != nullptr)
93 rest_server_ = std::string(c_str);
94 if (rest_server_.empty())
95 return LOG_STATUS(Status::RestError(
96 "Error initializing rest client; server address is empty."));
97
98 RETURN_NOT_OK(config_->get("rest.server_serialization_format", &c_str));
99 if (c_str != nullptr)
100 RETURN_NOT_OK(serialization_type_enum(c_str, &serialization_type_));
101
102 RETURN_NOT_OK(config_->get("rest.resubmit_incomplete", &c_str));
103 if (c_str != nullptr)
104 RETURN_NOT_OK(utils::parse::convert(c_str, &resubmit_incomplete_));
105
106 return Status::Ok();
107 }
108
set_header(const std::string & name,const std::string & value)109 Status RestClient::set_header(
110 const std::string& name, const std::string& value) {
111 extra_headers_[name] = value;
112 return Status::Ok();
113 }
114
get_array_schema_from_rest(const URI & uri,ArraySchema ** array_schema)115 Status RestClient::get_array_schema_from_rest(
116 const URI& uri, ArraySchema** array_schema) {
117 // Init curl and form the URL
118 Curl curlc;
119 std::string array_ns, array_uri;
120 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
121 const std::string cache_key = array_ns + ":" + array_uri;
122 RETURN_NOT_OK(
123 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
124 const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
125 "/" + curlc.url_escape(array_uri);
126
127 // Get the data
128 Buffer returned_data;
129 RETURN_NOT_OK(curlc.get_data(
130 stats_, url, serialization_type_, &returned_data, cache_key));
131 if (returned_data.data() == nullptr || returned_data.size() == 0)
132 return LOG_STATUS(Status::RestError(
133 "Error getting array schema from REST; server returned no data."));
134
135 return serialization::array_schema_deserialize(
136 array_schema, serialization_type_, returned_data);
137 }
138
post_array_schema_to_rest(const URI & uri,ArraySchema * array_schema)139 Status RestClient::post_array_schema_to_rest(
140 const URI& uri, ArraySchema* array_schema) {
141 Buffer buff;
142 RETURN_NOT_OK(serialization::array_schema_serialize(
143 array_schema, serialization_type_, &buff, false));
144 // Wrap in a list
145 BufferList serialized;
146 RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
147
148 bool found = false;
149 const std::string creation_access_credentials_name =
150 config_->get("rest.creation_access_credentials_name", &found);
151 if (found)
152 RETURN_NOT_OK(set_header(
153 "X-TILEDB-CLOUD-ACCESS-CREDENTIALS-NAME",
154 creation_access_credentials_name));
155
156 // Init curl and form the URL
157 Curl curlc;
158 std::string array_ns, array_uri;
159 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
160 const std::string cache_key = array_ns + ":" + array_uri;
161 RETURN_NOT_OK(
162 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
163 auto deduced_url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
164 curlc.url_escape(array_uri);
165 Buffer returned_data;
166 const Status sc = curlc.post_data(
167 stats_,
168 deduced_url,
169 serialization_type_,
170 &serialized,
171 &returned_data,
172 cache_key);
173 return sc;
174 }
175
deregister_array_from_rest(const URI & uri)176 Status RestClient::deregister_array_from_rest(const URI& uri) {
177 // Init curl and form the URL
178 Curl curlc;
179 std::string array_ns, array_uri;
180 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
181 const std::string cache_key = array_ns + ":" + array_uri;
182 RETURN_NOT_OK(
183 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
184 const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
185 "/" + curlc.url_escape(array_uri) + "/deregister";
186
187 Buffer returned_data;
188 return curlc.delete_data(
189 stats_, url, serialization_type_, &returned_data, cache_key);
190 }
191
get_array_non_empty_domain(Array * array,uint64_t timestamp_start,uint64_t timestamp_end)192 Status RestClient::get_array_non_empty_domain(
193 Array* array, uint64_t timestamp_start, uint64_t timestamp_end) {
194 if (array == nullptr)
195 return LOG_STATUS(
196 Status::RestError("Cannot get array non-empty domain; array is null"));
197 if (array->array_uri().to_string().empty())
198 return LOG_STATUS(Status::RestError(
199 "Cannot get array non-empty domain; array URI is empty"));
200
201 // Init curl and form the URL
202 Curl curlc;
203 std::string array_ns, array_uri;
204 RETURN_NOT_OK(array->array_uri().get_rest_components(&array_ns, &array_uri));
205 const std::string cache_key = array_ns + ":" + array_uri;
206 RETURN_NOT_OK(
207 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
208 const std::string url = redirect_uri(cache_key) + "/v2/arrays/" + array_ns +
209 "/" + curlc.url_escape(array_uri) +
210 "/non_empty_domain?" +
211 "start_timestamp=" + std::to_string(timestamp_start) +
212 "&end_timestamp=" + std::to_string(timestamp_end);
213
214 // Get the data
215 Buffer returned_data;
216 RETURN_NOT_OK(curlc.get_data(
217 stats_, url, serialization_type_, &returned_data, cache_key));
218
219 if (returned_data.data() == nullptr || returned_data.size() == 0)
220 return LOG_STATUS(
221 Status::RestError("Error getting array non-empty domain "
222 "from REST; server returned no data."));
223
224 // Deserialize data returned
225 return serialization::nonempty_domain_deserialize(
226 array, returned_data, serialization_type_);
227 }
228
get_array_max_buffer_sizes(const URI & uri,const ArraySchema * schema,const void * subarray,std::unordered_map<std::string,std::pair<uint64_t,uint64_t>> * buffer_sizes)229 Status RestClient::get_array_max_buffer_sizes(
230 const URI& uri,
231 const ArraySchema* schema,
232 const void* subarray,
233 std::unordered_map<std::string, std::pair<uint64_t, uint64_t>>*
234 buffer_sizes) {
235 // Convert subarray to string for query parameter
236 std::string subarray_str;
237 RETURN_NOT_OK(subarray_to_str(schema, subarray, &subarray_str));
238 std::string subarray_query_param =
239 subarray_str.empty() ? "" : ("?subarray=" + subarray_str);
240
241 // Init curl and form the URL
242 Curl curlc;
243 std::string array_ns, array_uri;
244 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
245 const std::string cache_key = array_ns + ":" + array_uri;
246 RETURN_NOT_OK(
247 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
248 const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
249 "/" + curlc.url_escape(array_uri) +
250 "/max_buffer_sizes" + subarray_query_param;
251
252 // Get the data
253 Buffer returned_data;
254 RETURN_NOT_OK(curlc.get_data(
255 stats_, url, serialization_type_, &returned_data, cache_key));
256
257 if (returned_data.data() == nullptr || returned_data.size() == 0)
258 return LOG_STATUS(
259 Status::RestError("Error getting array max buffer sizes "
260 "from REST; server returned no data."));
261
262 // Deserialize data returned
263 return serialization::max_buffer_sizes_deserialize(
264 schema, returned_data, serialization_type_, buffer_sizes);
265 }
266
get_array_metadata_from_rest(const URI & uri,uint64_t timestamp_start,uint64_t timestamp_end,Array * array)267 Status RestClient::get_array_metadata_from_rest(
268 const URI& uri,
269 uint64_t timestamp_start,
270 uint64_t timestamp_end,
271 Array* array) {
272 if (array == nullptr)
273 return LOG_STATUS(Status::RestError(
274 "Error getting array metadata from REST; array is null."));
275
276 // Init curl and form the URL
277 Curl curlc;
278 std::string array_ns, array_uri;
279 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
280 const std::string cache_key = array_ns + ":" + array_uri;
281 RETURN_NOT_OK(
282 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
283 const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
284 "/" + curlc.url_escape(array_uri) +
285 "/array_metadata?" +
286 "start_timestamp=" + std::to_string(timestamp_start) +
287 "&end_timestamp=" + std::to_string(timestamp_end);
288
289 // Get the data
290 Buffer returned_data;
291 RETURN_NOT_OK(curlc.get_data(
292 stats_, url, serialization_type_, &returned_data, cache_key));
293 if (returned_data.data() == nullptr || returned_data.size() == 0)
294 return LOG_STATUS(Status::RestError(
295 "Error getting array metadata from REST; server returned no data."));
296
297 return serialization::array_metadata_deserialize(
298 array, serialization_type_, returned_data);
299 }
300
post_array_metadata_to_rest(const URI & uri,uint64_t timestamp_start,uint64_t timestamp_end,Array * array)301 Status RestClient::post_array_metadata_to_rest(
302 const URI& uri,
303 uint64_t timestamp_start,
304 uint64_t timestamp_end,
305 Array* array) {
306 if (array == nullptr)
307 return LOG_STATUS(Status::RestError(
308 "Error posting array metadata to REST; array is null."));
309
310 Buffer buff;
311 RETURN_NOT_OK(serialization::array_metadata_serialize(
312 array, serialization_type_, &buff));
313 // Wrap in a list
314 BufferList serialized;
315 RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
316
317 // Init curl and form the URL
318 Curl curlc;
319 std::string array_ns, array_uri;
320 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
321 const std::string cache_key = array_ns + ":" + array_uri;
322 RETURN_NOT_OK(
323 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
324 const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
325 "/" + curlc.url_escape(array_uri) +
326 "/array_metadata?" +
327 "start_timestamp=" + std::to_string(timestamp_start) +
328 "&end_timestamp=" + std::to_string(timestamp_end);
329
330 // Put the data
331 Buffer returned_data;
332 return curlc.post_data(
333 stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
334 }
335
submit_query_to_rest(const URI & uri,Query * query)336 Status RestClient::submit_query_to_rest(const URI& uri, Query* query) {
337 // Local state tracking for the current offsets into the user's query buffers.
338 // This allows resubmission of incomplete queries while appending to the
339 // same user buffers.
340 serialization::CopyState copy_state;
341
342 RETURN_NOT_OK(post_query_submit(uri, query, ©_state));
343
344 // Now need to update the buffer sizes to the actual copied data size so that
345 // the user can check the result size on reads.
346 RETURN_NOT_OK(update_attribute_buffer_sizes(copy_state, query));
347
348 return Status::Ok();
349 }
350
post_query_submit(const URI & uri,Query * query,serialization::CopyState * copy_state)351 Status RestClient::post_query_submit(
352 const URI& uri, Query* query, serialization::CopyState* copy_state) {
353 // Get array
354 const Array* array = query->array();
355 if (array == nullptr) {
356 return LOG_STATUS(
357 Status::RestError("Error submitting query to REST; null array."));
358 }
359
360 auto rest_scratch = query->rest_scratch();
361
362 if (rest_scratch->size() > 0) {
363 bool skip;
364 query_post_call_back(
365 false, nullptr, 0, &skip, rest_scratch, query, copy_state);
366 }
367
368 // Serialize query to send
369 BufferList serialized;
370 RETURN_NOT_OK(serialization::query_serialize(
371 query, serialization_type_, true, &serialized));
372
373 // Init curl and form the URL
374 Curl curlc;
375 std::string array_ns, array_uri;
376 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
377 const std::string cache_key = array_ns + ":" + array_uri;
378 RETURN_NOT_OK(
379 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
380 std::string url = redirect_uri(cache_key) + "/v2/arrays/" + array_ns + "/" +
381 curlc.url_escape(array_uri) +
382 "/query/submit?type=" + query_type_str(query->type()) +
383 "&read_all=" + (resubmit_incomplete_ ? "true" : "false");
384
385 // Remote array reads always supply the timestamp.
386 url += "&start_timestamp=" + std::to_string(array->timestamp_start());
387 url += "&end_timestamp=" + std::to_string(array->timestamp_end());
388
389 // Create the callback that will process the response buffers as they
390 // are received.
391 auto write_cb = std::bind(
392 &RestClient::query_post_call_back,
393 this,
394 std::placeholders::_1,
395 std::placeholders::_2,
396 std::placeholders::_3,
397 std::placeholders::_4,
398 rest_scratch,
399 query,
400 copy_state);
401
402 const Status st = curlc.post_data(
403 stats_,
404 url,
405 serialization_type_,
406 &serialized,
407 rest_scratch.get(),
408 std::move(write_cb),
409 cache_key);
410
411 if (!st.ok() && copy_state->empty()) {
412 return LOG_STATUS(Status::RestError(
413 "Error submitting query to REST; "
414 "server returned no data. "
415 "Curl error: " +
416 st.message()));
417 }
418
419 return st;
420 }
421
query_post_call_back(const bool reset,void * const contents,const size_t content_nbytes,bool * const skip_retries,tdb_shared_ptr<Buffer> scratch,Query * query,serialization::CopyState * copy_state)422 size_t RestClient::query_post_call_back(
423 const bool reset,
424 void* const contents,
425 const size_t content_nbytes,
426 bool* const skip_retries,
427 tdb_shared_ptr<Buffer> scratch,
428 Query* query,
429 serialization::CopyState* copy_state) {
430 // All return statements in this function must pass through this wrapper.
431 // This is responsible for two things:
432 // 1. The 'bytes_processed' may be negative in error scenarios. The negative
433 // number is only used for convenience during processing. We must restrict
434 // the return value to >= 0 before returning.
435 // 2. When our return value ('bytes_processed') does not match the number of
436 // input bytes ('content_nbytes'), The CURL layer that invoked this
437 // callback will interpret this as an error and may attempt to retry. We
438 // specifically want to prevent CURL from retrying the request if we have
439 // reached this callback. If we encounter an error within this callback,
440 // the issue is with the response data itself and not an issue with
441 // transporting the response data (for example, the common issue will be
442 // deserialization failure). In this scenario, we will waste time retrying
443 // on response data that we know we cannot handle without error.
444 auto return_wrapper = [content_nbytes, skip_retries](long bytes_processed) {
445 bytes_processed = std::max(bytes_processed, 0L);
446 if (static_cast<size_t>(bytes_processed) != content_nbytes) {
447 *skip_retries = true;
448 }
449 return bytes_processed;
450 };
451
452 // This is the return value that represents the amount of bytes processed
453 // in 'contents'. This will act as the return value and will always be
454 // less-than-or-equal-to 'content_nbytes'.
455 long bytes_processed = 0;
456
457 // When 'reset' is true, we must discard the in-progress memory state.
458 // The most likely scenario is that the request failed and was retried
459 // from within the Curl object.
460 if (reset) {
461 scratch->set_size(0);
462 scratch->reset_offset();
463 copy_state->clear();
464 }
465
466 // If the current scratch size is non-empty, we must subtract its size
467 // from 'bytes_processed' so that we do not count bytes processed from
468 // a previous callback.
469 bytes_processed -= scratch->size();
470
471 // Copy 'contents' to the end of 'scratch'. As a future optimization, if
472 // 'scratch' is empty, we could attempt to process 'contents' in-place and
473 // only copy the remaining, unprocessed bytes into 'scratch'.
474 scratch->set_offset(scratch->size());
475 Status st = scratch->write(contents, content_nbytes);
476 if (!st.ok()) {
477 LOG_ERROR(
478 "Cannot copy libcurl response data; buffer write failed: " +
479 st.to_string());
480 return return_wrapper(bytes_processed);
481 }
482
483 // Process all of the serialized queries contained within 'scratch'.
484 scratch->reset_offset();
485 while (scratch->offset() < scratch->size()) {
486 // We need at least 8 bytes to determine the size of the next
487 // serialized query.
488 if (scratch->offset() + 8 > scratch->size()) {
489 break;
490 }
491
492 // Decode the query size. We could cache this from the previous
493 // callback to prevent decoding the same prefix multiple times.
494 const uint64_t query_size =
495 utils::endianness::decode_le<uint64_t>(scratch->cur_data());
496
497 // We must have the full serialized query before attempting to
498 // deserialize it.
499 if (scratch->offset() + 8 + query_size > scratch->size()) {
500 break;
501 }
502
503 // At this point of execution, we know that we the next serialized
504 // query is entirely in 'scratch'. For convenience, we will advance
505 // the offset to point to the start of the serialized query.
506 scratch->advance_offset(8);
507
508 // We can only deserialize the query if it is 8-byte aligned. If the
509 // offset is 8-byte aligned, we can deserialize the query in-place.
510 // Otherwise, we must make a copy to an auxiliary buffer.
511 if (scratch->offset() % 8 != 0) {
512 // Copy the entire serialized buffer to a newly allocated, 8-byte
513 // aligned auxiliary buffer.
514 Buffer aux;
515 st = aux.write(scratch->cur_data(), query_size);
516 if (!st.ok()) {
517 scratch->set_offset(scratch->offset() - 8);
518 return return_wrapper(bytes_processed);
519 }
520
521 // Deserialize the buffer and store it in 'copy_state'. If
522 // the user buffers are too small to accomodate the attribute
523 // data when deserializing read queries, this will return an
524 // error status.
525 aux.reset_offset();
526 st = serialization::query_deserialize(
527 aux, serialization_type_, true, copy_state, query, compute_tp_);
528 if (!st.ok()) {
529 scratch->set_offset(scratch->offset() - 8);
530 return return_wrapper(bytes_processed);
531 }
532 } else {
533 // Deserialize the buffer and store it in 'copy_state'. If
534 // the user buffers are too small to accomodate the attribute
535 // data when deserializing read queries, this will return an
536 // error status.
537 st = serialization::query_deserialize(
538 *scratch, serialization_type_, true, copy_state, query, compute_tp_);
539 if (!st.ok()) {
540 scratch->set_offset(scratch->offset() - 8);
541 return return_wrapper(bytes_processed);
542 }
543 }
544
545 scratch->advance_offset(query_size);
546 bytes_processed += (query_size + 8);
547 }
548
549 // If there are unprocessed bytes left in the scratch space, copy them
550 // to the beginning of 'scratch'. The intent is to reduce memory
551 // consumption by overwriting the serialized query objects that we
552 // have already processed.
553 const uint64_t length = scratch->size() - scratch->offset();
554 if (scratch->offset() != 0 && length != 0) {
555 const uint64_t offset = scratch->offset();
556 scratch->reset_offset();
557
558 // When the length of the remaining bytes is less than offset,
559 // we can safely read the remaining bytes from 'scratch' and
560 // write them to the beginning of 'scratch' because there will
561 // not be an overlap in accessed memory between the source
562 // and destination. Otherwise, we must use an auxilary buffer
563 // to temporarily store the remaining bytes because the behavior
564 // of the 'memcpy' used 'Buffer::write' will be undefined because
565 // there will be an overlap in the memory of the source and
566 // destination.
567 if (length <= offset) {
568 scratch->reset_size();
569 st = scratch->write(scratch->data(offset), length);
570 } else {
571 Buffer aux;
572 st = aux.write(scratch->data(offset), length);
573 if (st.ok()) {
574 scratch->reset_size();
575 st = scratch->write(aux.data(), aux.size());
576 }
577 }
578
579 assert(st.ok());
580 if (!st.ok()) {
581 LOG_STATUS(st);
582 }
583 assert(scratch->size() == length);
584 }
585
586 bytes_processed += length;
587
588 assert(static_cast<size_t>(bytes_processed) == content_nbytes);
589 return return_wrapper(bytes_processed);
590 }
591
finalize_query_to_rest(const URI & uri,Query * query)592 Status RestClient::finalize_query_to_rest(const URI& uri, Query* query) {
593 // Serialize data to send
594 BufferList serialized;
595 RETURN_NOT_OK(serialization::query_serialize(
596 query, serialization_type_, true, &serialized));
597
598 // Init curl and form the URL
599 Curl curlc;
600 std::string array_ns, array_uri;
601 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
602 const std::string cache_key = array_ns + ":" + array_uri;
603 RETURN_NOT_OK(
604 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
605 const std::string url =
606 redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
607 curlc.url_escape(array_uri) +
608 "/query/finalize?type=" + query_type_str(query->type());
609 Buffer returned_data;
610 RETURN_NOT_OK(curlc.post_data(
611 stats_,
612 url,
613 serialization_type_,
614 &serialized,
615 &returned_data,
616 cache_key));
617
618 if (returned_data.data() == nullptr || returned_data.size() == 0)
619 return LOG_STATUS(
620 Status::RestError("Error finalizing query; server returned no data."));
621
622 // Deserialize data returned
623 returned_data.reset_offset();
624 return serialization::query_deserialize(
625 returned_data, serialization_type_, true, nullptr, query, compute_tp_);
626 }
627
subarray_to_str(const ArraySchema * schema,const void * subarray,std::string * subarray_str)628 Status RestClient::subarray_to_str(
629 const ArraySchema* schema,
630 const void* subarray,
631 std::string* subarray_str) {
632 const auto coords_type = schema->dimension(0)->type();
633 const auto dim_num = schema->dim_num();
634 const auto subarray_nelts = 2 * dim_num;
635
636 if (subarray == nullptr) {
637 *subarray_str = "";
638 return Status::Ok();
639 }
640
641 std::stringstream ss;
642 for (unsigned i = 0; i < subarray_nelts; i++) {
643 switch (coords_type) {
644 case Datatype::INT8:
645 ss << ((const int8_t*)subarray)[i];
646 break;
647 case Datatype::UINT8:
648 ss << ((const uint8_t*)subarray)[i];
649 break;
650 case Datatype::INT16:
651 ss << ((const int16_t*)subarray)[i];
652 break;
653 case Datatype::UINT16:
654 ss << ((const uint16_t*)subarray)[i];
655 break;
656 case Datatype::INT32:
657 ss << ((const int32_t*)subarray)[i];
658 break;
659 case Datatype::UINT32:
660 ss << ((const uint32_t*)subarray)[i];
661 break;
662 case Datatype::DATETIME_YEAR:
663 case Datatype::DATETIME_MONTH:
664 case Datatype::DATETIME_WEEK:
665 case Datatype::DATETIME_DAY:
666 case Datatype::DATETIME_HR:
667 case Datatype::DATETIME_MIN:
668 case Datatype::DATETIME_SEC:
669 case Datatype::DATETIME_MS:
670 case Datatype::DATETIME_US:
671 case Datatype::DATETIME_NS:
672 case Datatype::DATETIME_PS:
673 case Datatype::DATETIME_FS:
674 case Datatype::DATETIME_AS:
675 case Datatype::TIME_HR:
676 case Datatype::TIME_MIN:
677 case Datatype::TIME_SEC:
678 case Datatype::TIME_MS:
679 case Datatype::TIME_US:
680 case Datatype::TIME_NS:
681 case Datatype::TIME_PS:
682 case Datatype::TIME_FS:
683 case Datatype::TIME_AS:
684 case Datatype::INT64:
685 ss << ((const int64_t*)subarray)[i];
686 break;
687 case Datatype::UINT64:
688 ss << ((const uint64_t*)subarray)[i];
689 break;
690 case Datatype::FLOAT32:
691 ss << ((const float*)subarray)[i];
692 break;
693 case Datatype::FLOAT64:
694 ss << ((const double*)subarray)[i];
695 break;
696 default:
697 return LOG_STATUS(Status::RestError(
698 "Error converting subarray to string; unhandled datatype."));
699 }
700
701 if (i < subarray_nelts - 1)
702 ss << ",";
703 }
704
705 *subarray_str = ss.str();
706
707 return Status::Ok();
708 }
709
update_attribute_buffer_sizes(const serialization::CopyState & copy_state,Query * query) const710 Status RestClient::update_attribute_buffer_sizes(
711 const serialization::CopyState& copy_state, Query* query) const {
712 // Applicable only to reads
713 if (query->type() != QueryType::READ)
714 return Status::Ok();
715
716 for (const auto& cit : copy_state) {
717 const auto& name = cit.first;
718 auto state = cit.second;
719 auto query_buffer = query->buffer(name);
720 if (query_buffer.buffer_var_size_ != nullptr) {
721 *query_buffer.buffer_var_size_ = state.data_size;
722 *query_buffer.buffer_size_ = state.offset_size;
723 } else if (query_buffer.buffer_size_ != nullptr)
724 *query_buffer.buffer_size_ = state.data_size;
725 }
726
727 return Status::Ok();
728 }
729
get_query_est_result_sizes(const URI & uri,Query * query)730 Status RestClient::get_query_est_result_sizes(const URI& uri, Query* query) {
731 if (query == nullptr)
732 return LOG_STATUS(Status::RestError(
733 "Error getting query estimated result size from REST; Query is null."));
734
735 // Get array
736 const Array* array = query->array();
737 if (array == nullptr) {
738 return LOG_STATUS(Status::RestError(
739 "Error festing query estimated result size from REST; null array."));
740 }
741
742 // Serialize query to send
743 BufferList serialized;
744 RETURN_NOT_OK(serialization::query_serialize(
745 query, serialization_type_, true, &serialized));
746
747 // Init curl and form the URL
748 Curl curlc;
749 std::string array_ns, array_uri;
750 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
751 const std::string cache_key = array_ns + ":" + array_uri;
752 RETURN_NOT_OK(
753 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
754 std::string url =
755 redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
756 curlc.url_escape(array_uri) +
757 "/query/est_result_sizes?type=" + query_type_str(query->type());
758
759 // Remote array reads always supply the timestamp.
760 if (query->type() == QueryType::READ) {
761 url += "&start_timestamp=" + std::to_string(array->timestamp_start());
762 url += "&end_timestamp=" + std::to_string(array->timestamp_end());
763 }
764
765 // Get the data
766 Buffer returned_data;
767 RETURN_NOT_OK(curlc.post_data(
768 stats_,
769 url,
770 serialization_type_,
771 &serialized,
772 &returned_data,
773 cache_key));
774 if (returned_data.data() == nullptr || returned_data.size() == 0)
775 return LOG_STATUS(Status::RestError(
776 "Error getting array metadata from REST; server returned no data."));
777
778 return serialization::query_est_result_size_deserialize(
779 query, serialization_type_, true, returned_data);
780 }
781
redirect_uri(const std::string & cache_key)782 std::string RestClient::redirect_uri(const std::string& cache_key) {
783 std::unique_lock<std::mutex> rd_lck(redirect_mtx_);
784 std::unordered_map<std::string, std::string>::const_iterator cache_it =
785 redirect_meta_.find(cache_key);
786
787 return (cache_it == redirect_meta_.end()) ? rest_server_ : cache_it->second;
788 }
789
post_array_schema_evolution_to_rest(const URI & uri,ArraySchemaEvolution * array_schema_evolution)790 Status RestClient::post_array_schema_evolution_to_rest(
791 const URI& uri, ArraySchemaEvolution* array_schema_evolution) {
792 Buffer buff;
793 RETURN_NOT_OK(serialization::array_schema_evolution_serialize(
794 array_schema_evolution, serialization_type_, &buff, false));
795 // Wrap in a list
796 BufferList serialized;
797 RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
798
799 // Init curl and form the URL
800 Curl curlc;
801 std::string array_ns, array_uri;
802 RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
803 const std::string cache_key = array_ns + ":" + array_uri;
804 RETURN_NOT_OK(
805 curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
806 auto deduced_url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
807 curlc.url_escape(array_uri) + "/evolve";
808 Buffer returned_data;
809 const Status sc = curlc.post_data(
810 stats_,
811 deduced_url,
812 serialization_type_,
813 &serialized,
814 &returned_data,
815 cache_key);
816 return sc;
817 }
818
819 #else
820
821 RestClient::RestClient() {
822 (void)config_;
823 (void)rest_server_;
824 (void)serialization_type_;
825 }
826
827 Status RestClient::init(stats::Stats*, const Config*, ThreadPool*) {
828 return LOG_STATUS(
829 Status::RestError("Cannot use rest client; serialization not enabled."));
830 }
831
832 Status RestClient::set_header(const std::string&, const std::string&) {
833 return LOG_STATUS(
834 Status::RestError("Cannot use rest client; serialization not enabled."));
835 }
836
837 Status RestClient::get_array_schema_from_rest(const URI&, ArraySchema**) {
838 return LOG_STATUS(
839 Status::RestError("Cannot use rest client; serialization not enabled."));
840 }
841
842 Status RestClient::post_array_schema_to_rest(const URI&, ArraySchema*) {
843 return LOG_STATUS(
844 Status::RestError("Cannot use rest client; serialization not enabled."));
845 }
846
847 Status RestClient::deregister_array_from_rest(const URI&) {
848 return LOG_STATUS(
849 Status::RestError("Cannot use rest client; serialization not enabled."));
850 }
851
852 Status RestClient::get_array_non_empty_domain(Array*, uint64_t, uint64_t) {
853 return LOG_STATUS(
854 Status::RestError("Cannot use rest client; serialization not enabled."));
855 }
856
857 Status RestClient::get_array_max_buffer_sizes(
858 const URI&,
859 const ArraySchema*,
860 const void*,
861 std::unordered_map<std::string, std::pair<uint64_t, uint64_t>>*) {
862 return LOG_STATUS(
863 Status::RestError("Cannot use rest client; serialization not enabled."));
864 }
865
866 Status RestClient::get_array_metadata_from_rest(
867 const URI&, uint64_t, uint64_t, Array*) {
868 return LOG_STATUS(
869 Status::RestError("Cannot use rest client; serialization not enabled."));
870 }
871
872 Status RestClient::post_array_metadata_to_rest(
873 const URI&, uint64_t, uint64_t, Array*) {
874 return LOG_STATUS(
875 Status::RestError("Cannot use rest client; serialization not enabled."));
876 }
877
878 Status RestClient::submit_query_to_rest(const URI&, Query*) {
879 return LOG_STATUS(
880 Status::RestError("Cannot use rest client; serialization not enabled."));
881 }
882
883 Status RestClient::finalize_query_to_rest(const URI&, Query*) {
884 return LOG_STATUS(
885 Status::RestError("Cannot use rest client; serialization not enabled."));
886 }
887
888 Status RestClient::get_query_est_result_sizes(const URI&, Query*) {
889 return LOG_STATUS(
890 Status::RestError("Cannot use rest client; serialization not enabled."));
891 }
892
893 Status RestClient::post_array_schema_evolution_to_rest(
894 const URI&, ArraySchemaEvolution*) {
895 return LOG_STATUS(
896 Status::RestError("Cannot use rest client; serialization not enabled."));
897 }
898
899 #endif // TILEDB_SERIALIZATION
900
901 } // namespace sm
902 } // namespace tiledb
903