1 /**
2  * @file   rest_client.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2018-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file declares a REST client class.
31  */
32 
33 // clang-format off
34 #ifdef TILEDB_SERIALIZATION
35 #include "tiledb/sm/serialization/capnp_utils.h"
36 #include "tiledb/sm/serialization/array_schema_evolution.h"
37 #include "tiledb/sm/serialization/query.h"
38 #include "tiledb/sm/serialization/tiledb-rest.h"
39 #include "tiledb/sm/rest/curl.h" // must be included last to avoid Windows.h
40 #endif
41 // clang-format on
42 
43 // Something, somewhere seems to be defining TIME_MS as a macro
44 #if defined(TIME_MS)
45 #undef TIME_MS
46 #endif
47 
48 #include <cassert>
49 
50 #include "tiledb/common/logger.h"
51 #include "tiledb/sm/array/array.h"
52 #include "tiledb/sm/enums/query_type.h"
53 #include "tiledb/sm/misc/constants.h"
54 #include "tiledb/sm/misc/utils.h"
55 #include "tiledb/sm/query/query.h"
56 #include "tiledb/sm/rest/rest_client.h"
57 #include "tiledb/sm/serialization/array_schema.h"
58 
59 using namespace tiledb::common;
60 
61 namespace tiledb {
62 namespace sm {
63 
64 #ifdef TILEDB_SERIALIZATION
65 
RestClient()66 RestClient::RestClient()
67     : stats_(nullptr)
68     , config_(nullptr)
69     , compute_tp_(nullptr)
70     , resubmit_incomplete_(true) {
71   auto st = utils::parse::convert(
72       Config::REST_SERIALIZATION_DEFAULT_FORMAT, &serialization_type_);
73   assert(st.ok());
74   (void)st;
75 }
76 
init(stats::Stats * const parent_stats,const Config * config,ThreadPool * compute_tp)77 Status RestClient::init(
78     stats::Stats* const parent_stats,
79     const Config* config,
80     ThreadPool* compute_tp) {
81   if (config == nullptr)
82     return LOG_STATUS(
83         Status::RestError("Error initializing rest client; config is null."));
84 
85   stats_ = parent_stats->create_child("RestClient");
86 
87   config_ = config;
88   compute_tp_ = compute_tp;
89 
90   const char* c_str;
91   RETURN_NOT_OK(config_->get("rest.server_address", &c_str));
92   if (c_str != nullptr)
93     rest_server_ = std::string(c_str);
94   if (rest_server_.empty())
95     return LOG_STATUS(Status::RestError(
96         "Error initializing rest client; server address is empty."));
97 
98   RETURN_NOT_OK(config_->get("rest.server_serialization_format", &c_str));
99   if (c_str != nullptr)
100     RETURN_NOT_OK(serialization_type_enum(c_str, &serialization_type_));
101 
102   RETURN_NOT_OK(config_->get("rest.resubmit_incomplete", &c_str));
103   if (c_str != nullptr)
104     RETURN_NOT_OK(utils::parse::convert(c_str, &resubmit_incomplete_));
105 
106   return Status::Ok();
107 }
108 
set_header(const std::string & name,const std::string & value)109 Status RestClient::set_header(
110     const std::string& name, const std::string& value) {
111   extra_headers_[name] = value;
112   return Status::Ok();
113 }
114 
get_array_schema_from_rest(const URI & uri,ArraySchema ** array_schema)115 Status RestClient::get_array_schema_from_rest(
116     const URI& uri, ArraySchema** array_schema) {
117   // Init curl and form the URL
118   Curl curlc;
119   std::string array_ns, array_uri;
120   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
121   const std::string cache_key = array_ns + ":" + array_uri;
122   RETURN_NOT_OK(
123       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
124   const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
125                           "/" + curlc.url_escape(array_uri);
126 
127   // Get the data
128   Buffer returned_data;
129   RETURN_NOT_OK(curlc.get_data(
130       stats_, url, serialization_type_, &returned_data, cache_key));
131   if (returned_data.data() == nullptr || returned_data.size() == 0)
132     return LOG_STATUS(Status::RestError(
133         "Error getting array schema from REST; server returned no data."));
134 
135   return serialization::array_schema_deserialize(
136       array_schema, serialization_type_, returned_data);
137 }
138 
post_array_schema_to_rest(const URI & uri,ArraySchema * array_schema)139 Status RestClient::post_array_schema_to_rest(
140     const URI& uri, ArraySchema* array_schema) {
141   Buffer buff;
142   RETURN_NOT_OK(serialization::array_schema_serialize(
143       array_schema, serialization_type_, &buff, false));
144   // Wrap in a list
145   BufferList serialized;
146   RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
147 
148   bool found = false;
149   const std::string creation_access_credentials_name =
150       config_->get("rest.creation_access_credentials_name", &found);
151   if (found)
152     RETURN_NOT_OK(set_header(
153         "X-TILEDB-CLOUD-ACCESS-CREDENTIALS-NAME",
154         creation_access_credentials_name));
155 
156   // Init curl and form the URL
157   Curl curlc;
158   std::string array_ns, array_uri;
159   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
160   const std::string cache_key = array_ns + ":" + array_uri;
161   RETURN_NOT_OK(
162       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
163   auto deduced_url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
164                      curlc.url_escape(array_uri);
165   Buffer returned_data;
166   const Status sc = curlc.post_data(
167       stats_,
168       deduced_url,
169       serialization_type_,
170       &serialized,
171       &returned_data,
172       cache_key);
173   return sc;
174 }
175 
deregister_array_from_rest(const URI & uri)176 Status RestClient::deregister_array_from_rest(const URI& uri) {
177   // Init curl and form the URL
178   Curl curlc;
179   std::string array_ns, array_uri;
180   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
181   const std::string cache_key = array_ns + ":" + array_uri;
182   RETURN_NOT_OK(
183       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
184   const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
185                           "/" + curlc.url_escape(array_uri) + "/deregister";
186 
187   Buffer returned_data;
188   return curlc.delete_data(
189       stats_, url, serialization_type_, &returned_data, cache_key);
190 }
191 
get_array_non_empty_domain(Array * array,uint64_t timestamp_start,uint64_t timestamp_end)192 Status RestClient::get_array_non_empty_domain(
193     Array* array, uint64_t timestamp_start, uint64_t timestamp_end) {
194   if (array == nullptr)
195     return LOG_STATUS(
196         Status::RestError("Cannot get array non-empty domain; array is null"));
197   if (array->array_uri().to_string().empty())
198     return LOG_STATUS(Status::RestError(
199         "Cannot get array non-empty domain; array URI is empty"));
200 
201   // Init curl and form the URL
202   Curl curlc;
203   std::string array_ns, array_uri;
204   RETURN_NOT_OK(array->array_uri().get_rest_components(&array_ns, &array_uri));
205   const std::string cache_key = array_ns + ":" + array_uri;
206   RETURN_NOT_OK(
207       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
208   const std::string url = redirect_uri(cache_key) + "/v2/arrays/" + array_ns +
209                           "/" + curlc.url_escape(array_uri) +
210                           "/non_empty_domain?" +
211                           "start_timestamp=" + std::to_string(timestamp_start) +
212                           "&end_timestamp=" + std::to_string(timestamp_end);
213 
214   // Get the data
215   Buffer returned_data;
216   RETURN_NOT_OK(curlc.get_data(
217       stats_, url, serialization_type_, &returned_data, cache_key));
218 
219   if (returned_data.data() == nullptr || returned_data.size() == 0)
220     return LOG_STATUS(
221         Status::RestError("Error getting array non-empty domain "
222                           "from REST; server returned no data."));
223 
224   // Deserialize data returned
225   return serialization::nonempty_domain_deserialize(
226       array, returned_data, serialization_type_);
227 }
228 
get_array_max_buffer_sizes(const URI & uri,const ArraySchema * schema,const void * subarray,std::unordered_map<std::string,std::pair<uint64_t,uint64_t>> * buffer_sizes)229 Status RestClient::get_array_max_buffer_sizes(
230     const URI& uri,
231     const ArraySchema* schema,
232     const void* subarray,
233     std::unordered_map<std::string, std::pair<uint64_t, uint64_t>>*
234         buffer_sizes) {
235   // Convert subarray to string for query parameter
236   std::string subarray_str;
237   RETURN_NOT_OK(subarray_to_str(schema, subarray, &subarray_str));
238   std::string subarray_query_param =
239       subarray_str.empty() ? "" : ("?subarray=" + subarray_str);
240 
241   // Init curl and form the URL
242   Curl curlc;
243   std::string array_ns, array_uri;
244   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
245   const std::string cache_key = array_ns + ":" + array_uri;
246   RETURN_NOT_OK(
247       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
248   const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
249                           "/" + curlc.url_escape(array_uri) +
250                           "/max_buffer_sizes" + subarray_query_param;
251 
252   // Get the data
253   Buffer returned_data;
254   RETURN_NOT_OK(curlc.get_data(
255       stats_, url, serialization_type_, &returned_data, cache_key));
256 
257   if (returned_data.data() == nullptr || returned_data.size() == 0)
258     return LOG_STATUS(
259         Status::RestError("Error getting array max buffer sizes "
260                           "from REST; server returned no data."));
261 
262   // Deserialize data returned
263   return serialization::max_buffer_sizes_deserialize(
264       schema, returned_data, serialization_type_, buffer_sizes);
265 }
266 
get_array_metadata_from_rest(const URI & uri,uint64_t timestamp_start,uint64_t timestamp_end,Array * array)267 Status RestClient::get_array_metadata_from_rest(
268     const URI& uri,
269     uint64_t timestamp_start,
270     uint64_t timestamp_end,
271     Array* array) {
272   if (array == nullptr)
273     return LOG_STATUS(Status::RestError(
274         "Error getting array metadata from REST; array is null."));
275 
276   // Init curl and form the URL
277   Curl curlc;
278   std::string array_ns, array_uri;
279   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
280   const std::string cache_key = array_ns + ":" + array_uri;
281   RETURN_NOT_OK(
282       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
283   const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
284                           "/" + curlc.url_escape(array_uri) +
285                           "/array_metadata?" +
286                           "start_timestamp=" + std::to_string(timestamp_start) +
287                           "&end_timestamp=" + std::to_string(timestamp_end);
288 
289   // Get the data
290   Buffer returned_data;
291   RETURN_NOT_OK(curlc.get_data(
292       stats_, url, serialization_type_, &returned_data, cache_key));
293   if (returned_data.data() == nullptr || returned_data.size() == 0)
294     return LOG_STATUS(Status::RestError(
295         "Error getting array metadata from REST; server returned no data."));
296 
297   return serialization::array_metadata_deserialize(
298       array, serialization_type_, returned_data);
299 }
300 
post_array_metadata_to_rest(const URI & uri,uint64_t timestamp_start,uint64_t timestamp_end,Array * array)301 Status RestClient::post_array_metadata_to_rest(
302     const URI& uri,
303     uint64_t timestamp_start,
304     uint64_t timestamp_end,
305     Array* array) {
306   if (array == nullptr)
307     return LOG_STATUS(Status::RestError(
308         "Error posting array metadata to REST; array is null."));
309 
310   Buffer buff;
311   RETURN_NOT_OK(serialization::array_metadata_serialize(
312       array, serialization_type_, &buff));
313   // Wrap in a list
314   BufferList serialized;
315   RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
316 
317   // Init curl and form the URL
318   Curl curlc;
319   std::string array_ns, array_uri;
320   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
321   const std::string cache_key = array_ns + ":" + array_uri;
322   RETURN_NOT_OK(
323       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
324   const std::string url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns +
325                           "/" + curlc.url_escape(array_uri) +
326                           "/array_metadata?" +
327                           "start_timestamp=" + std::to_string(timestamp_start) +
328                           "&end_timestamp=" + std::to_string(timestamp_end);
329 
330   // Put the data
331   Buffer returned_data;
332   return curlc.post_data(
333       stats_, url, serialization_type_, &serialized, &returned_data, cache_key);
334 }
335 
submit_query_to_rest(const URI & uri,Query * query)336 Status RestClient::submit_query_to_rest(const URI& uri, Query* query) {
337   // Local state tracking for the current offsets into the user's query buffers.
338   // This allows resubmission of incomplete queries while appending to the
339   // same user buffers.
340   serialization::CopyState copy_state;
341 
342   RETURN_NOT_OK(post_query_submit(uri, query, &copy_state));
343 
344   // Now need to update the buffer sizes to the actual copied data size so that
345   // the user can check the result size on reads.
346   RETURN_NOT_OK(update_attribute_buffer_sizes(copy_state, query));
347 
348   return Status::Ok();
349 }
350 
post_query_submit(const URI & uri,Query * query,serialization::CopyState * copy_state)351 Status RestClient::post_query_submit(
352     const URI& uri, Query* query, serialization::CopyState* copy_state) {
353   // Get array
354   const Array* array = query->array();
355   if (array == nullptr) {
356     return LOG_STATUS(
357         Status::RestError("Error submitting query to REST; null array."));
358   }
359 
360   auto rest_scratch = query->rest_scratch();
361 
362   if (rest_scratch->size() > 0) {
363     bool skip;
364     query_post_call_back(
365         false, nullptr, 0, &skip, rest_scratch, query, copy_state);
366   }
367 
368   // Serialize query to send
369   BufferList serialized;
370   RETURN_NOT_OK(serialization::query_serialize(
371       query, serialization_type_, true, &serialized));
372 
373   // Init curl and form the URL
374   Curl curlc;
375   std::string array_ns, array_uri;
376   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
377   const std::string cache_key = array_ns + ":" + array_uri;
378   RETURN_NOT_OK(
379       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
380   std::string url = redirect_uri(cache_key) + "/v2/arrays/" + array_ns + "/" +
381                     curlc.url_escape(array_uri) +
382                     "/query/submit?type=" + query_type_str(query->type()) +
383                     "&read_all=" + (resubmit_incomplete_ ? "true" : "false");
384 
385   // Remote array reads always supply the timestamp.
386   url += "&start_timestamp=" + std::to_string(array->timestamp_start());
387   url += "&end_timestamp=" + std::to_string(array->timestamp_end());
388 
389   // Create the callback that will process the response buffers as they
390   // are received.
391   auto write_cb = std::bind(
392       &RestClient::query_post_call_back,
393       this,
394       std::placeholders::_1,
395       std::placeholders::_2,
396       std::placeholders::_3,
397       std::placeholders::_4,
398       rest_scratch,
399       query,
400       copy_state);
401 
402   const Status st = curlc.post_data(
403       stats_,
404       url,
405       serialization_type_,
406       &serialized,
407       rest_scratch.get(),
408       std::move(write_cb),
409       cache_key);
410 
411   if (!st.ok() && copy_state->empty()) {
412     return LOG_STATUS(Status::RestError(
413         "Error submitting query to REST; "
414         "server returned no data. "
415         "Curl error: " +
416         st.message()));
417   }
418 
419   return st;
420 }
421 
query_post_call_back(const bool reset,void * const contents,const size_t content_nbytes,bool * const skip_retries,tdb_shared_ptr<Buffer> scratch,Query * query,serialization::CopyState * copy_state)422 size_t RestClient::query_post_call_back(
423     const bool reset,
424     void* const contents,
425     const size_t content_nbytes,
426     bool* const skip_retries,
427     tdb_shared_ptr<Buffer> scratch,
428     Query* query,
429     serialization::CopyState* copy_state) {
430   // All return statements in this function must pass through this wrapper.
431   // This is responsible for two things:
432   // 1. The 'bytes_processed' may be negative in error scenarios. The negative
433   //    number is only used for convenience during processing. We must restrict
434   //    the return value to >= 0 before returning.
435   // 2. When our return value ('bytes_processed') does not match the number of
436   //    input bytes ('content_nbytes'), The CURL layer that invoked this
437   //    callback will interpret this as an error and may attempt to retry. We
438   //    specifically want to prevent CURL from retrying the request if we have
439   //    reached this callback. If we encounter an error within this callback,
440   //    the issue is with the response data itself and not an issue with
441   //    transporting the response data (for example, the common issue will be
442   //    deserialization failure). In this scenario, we will waste time retrying
443   //    on response data that we know we cannot handle without error.
444   auto return_wrapper = [content_nbytes, skip_retries](long bytes_processed) {
445     bytes_processed = std::max(bytes_processed, 0L);
446     if (static_cast<size_t>(bytes_processed) != content_nbytes) {
447       *skip_retries = true;
448     }
449     return bytes_processed;
450   };
451 
452   // This is the return value that represents the amount of bytes processed
453   // in 'contents'. This will act as the return value and will always be
454   // less-than-or-equal-to 'content_nbytes'.
455   long bytes_processed = 0;
456 
457   // When 'reset' is true, we must discard the in-progress memory state.
458   // The most likely scenario is that the request failed and was retried
459   // from within the Curl object.
460   if (reset) {
461     scratch->set_size(0);
462     scratch->reset_offset();
463     copy_state->clear();
464   }
465 
466   // If the current scratch size is non-empty, we must subtract its size
467   // from 'bytes_processed' so that we do not count bytes processed from
468   // a previous callback.
469   bytes_processed -= scratch->size();
470 
471   // Copy 'contents' to the end of 'scratch'. As a future optimization, if
472   // 'scratch' is empty, we could attempt to process 'contents' in-place and
473   // only copy the remaining, unprocessed bytes into 'scratch'.
474   scratch->set_offset(scratch->size());
475   Status st = scratch->write(contents, content_nbytes);
476   if (!st.ok()) {
477     LOG_ERROR(
478         "Cannot copy libcurl response data; buffer write failed: " +
479         st.to_string());
480     return return_wrapper(bytes_processed);
481   }
482 
483   // Process all of the serialized queries contained within 'scratch'.
484   scratch->reset_offset();
485   while (scratch->offset() < scratch->size()) {
486     // We need at least 8 bytes to determine the size of the next
487     // serialized query.
488     if (scratch->offset() + 8 > scratch->size()) {
489       break;
490     }
491 
492     // Decode the query size. We could cache this from the previous
493     // callback to prevent decoding the same prefix multiple times.
494     const uint64_t query_size =
495         utils::endianness::decode_le<uint64_t>(scratch->cur_data());
496 
497     // We must have the full serialized query before attempting to
498     // deserialize it.
499     if (scratch->offset() + 8 + query_size > scratch->size()) {
500       break;
501     }
502 
503     // At this point of execution, we know that we the next serialized
504     // query is entirely in 'scratch'. For convenience, we will advance
505     // the offset to point to the start of the serialized query.
506     scratch->advance_offset(8);
507 
508     // We can only deserialize the query if it is 8-byte aligned. If the
509     // offset is 8-byte aligned, we can deserialize the query in-place.
510     // Otherwise, we must make a copy to an auxiliary buffer.
511     if (scratch->offset() % 8 != 0) {
512       // Copy the entire serialized buffer to a newly allocated, 8-byte
513       // aligned auxiliary buffer.
514       Buffer aux;
515       st = aux.write(scratch->cur_data(), query_size);
516       if (!st.ok()) {
517         scratch->set_offset(scratch->offset() - 8);
518         return return_wrapper(bytes_processed);
519       }
520 
521       // Deserialize the buffer and store it in 'copy_state'. If
522       // the user buffers are too small to accomodate the attribute
523       // data when deserializing read queries, this will return an
524       // error status.
525       aux.reset_offset();
526       st = serialization::query_deserialize(
527           aux, serialization_type_, true, copy_state, query, compute_tp_);
528       if (!st.ok()) {
529         scratch->set_offset(scratch->offset() - 8);
530         return return_wrapper(bytes_processed);
531       }
532     } else {
533       // Deserialize the buffer and store it in 'copy_state'. If
534       // the user buffers are too small to accomodate the attribute
535       // data when deserializing read queries, this will return an
536       // error status.
537       st = serialization::query_deserialize(
538           *scratch, serialization_type_, true, copy_state, query, compute_tp_);
539       if (!st.ok()) {
540         scratch->set_offset(scratch->offset() - 8);
541         return return_wrapper(bytes_processed);
542       }
543     }
544 
545     scratch->advance_offset(query_size);
546     bytes_processed += (query_size + 8);
547   }
548 
549   // If there are unprocessed bytes left in the scratch space, copy them
550   // to the beginning of 'scratch'. The intent is to reduce memory
551   // consumption by overwriting the serialized query objects that we
552   // have already processed.
553   const uint64_t length = scratch->size() - scratch->offset();
554   if (scratch->offset() != 0 && length != 0) {
555     const uint64_t offset = scratch->offset();
556     scratch->reset_offset();
557 
558     // When the length of the remaining bytes is less than offset,
559     // we can safely read the remaining bytes from 'scratch' and
560     // write them to the beginning of 'scratch' because there will
561     // not be an overlap in accessed memory between the source
562     // and destination. Otherwise, we must use an auxilary buffer
563     // to temporarily store the remaining bytes because the behavior
564     // of the 'memcpy' used 'Buffer::write' will be undefined because
565     // there will be an overlap in the memory of the source and
566     // destination.
567     if (length <= offset) {
568       scratch->reset_size();
569       st = scratch->write(scratch->data(offset), length);
570     } else {
571       Buffer aux;
572       st = aux.write(scratch->data(offset), length);
573       if (st.ok()) {
574         scratch->reset_size();
575         st = scratch->write(aux.data(), aux.size());
576       }
577     }
578 
579     assert(st.ok());
580     if (!st.ok()) {
581       LOG_STATUS(st);
582     }
583     assert(scratch->size() == length);
584   }
585 
586   bytes_processed += length;
587 
588   assert(static_cast<size_t>(bytes_processed) == content_nbytes);
589   return return_wrapper(bytes_processed);
590 }
591 
finalize_query_to_rest(const URI & uri,Query * query)592 Status RestClient::finalize_query_to_rest(const URI& uri, Query* query) {
593   // Serialize data to send
594   BufferList serialized;
595   RETURN_NOT_OK(serialization::query_serialize(
596       query, serialization_type_, true, &serialized));
597 
598   // Init curl and form the URL
599   Curl curlc;
600   std::string array_ns, array_uri;
601   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
602   const std::string cache_key = array_ns + ":" + array_uri;
603   RETURN_NOT_OK(
604       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
605   const std::string url =
606       redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
607       curlc.url_escape(array_uri) +
608       "/query/finalize?type=" + query_type_str(query->type());
609   Buffer returned_data;
610   RETURN_NOT_OK(curlc.post_data(
611       stats_,
612       url,
613       serialization_type_,
614       &serialized,
615       &returned_data,
616       cache_key));
617 
618   if (returned_data.data() == nullptr || returned_data.size() == 0)
619     return LOG_STATUS(
620         Status::RestError("Error finalizing query; server returned no data."));
621 
622   // Deserialize data returned
623   returned_data.reset_offset();
624   return serialization::query_deserialize(
625       returned_data, serialization_type_, true, nullptr, query, compute_tp_);
626 }
627 
subarray_to_str(const ArraySchema * schema,const void * subarray,std::string * subarray_str)628 Status RestClient::subarray_to_str(
629     const ArraySchema* schema,
630     const void* subarray,
631     std::string* subarray_str) {
632   const auto coords_type = schema->dimension(0)->type();
633   const auto dim_num = schema->dim_num();
634   const auto subarray_nelts = 2 * dim_num;
635 
636   if (subarray == nullptr) {
637     *subarray_str = "";
638     return Status::Ok();
639   }
640 
641   std::stringstream ss;
642   for (unsigned i = 0; i < subarray_nelts; i++) {
643     switch (coords_type) {
644       case Datatype::INT8:
645         ss << ((const int8_t*)subarray)[i];
646         break;
647       case Datatype::UINT8:
648         ss << ((const uint8_t*)subarray)[i];
649         break;
650       case Datatype::INT16:
651         ss << ((const int16_t*)subarray)[i];
652         break;
653       case Datatype::UINT16:
654         ss << ((const uint16_t*)subarray)[i];
655         break;
656       case Datatype::INT32:
657         ss << ((const int32_t*)subarray)[i];
658         break;
659       case Datatype::UINT32:
660         ss << ((const uint32_t*)subarray)[i];
661         break;
662       case Datatype::DATETIME_YEAR:
663       case Datatype::DATETIME_MONTH:
664       case Datatype::DATETIME_WEEK:
665       case Datatype::DATETIME_DAY:
666       case Datatype::DATETIME_HR:
667       case Datatype::DATETIME_MIN:
668       case Datatype::DATETIME_SEC:
669       case Datatype::DATETIME_MS:
670       case Datatype::DATETIME_US:
671       case Datatype::DATETIME_NS:
672       case Datatype::DATETIME_PS:
673       case Datatype::DATETIME_FS:
674       case Datatype::DATETIME_AS:
675       case Datatype::TIME_HR:
676       case Datatype::TIME_MIN:
677       case Datatype::TIME_SEC:
678       case Datatype::TIME_MS:
679       case Datatype::TIME_US:
680       case Datatype::TIME_NS:
681       case Datatype::TIME_PS:
682       case Datatype::TIME_FS:
683       case Datatype::TIME_AS:
684       case Datatype::INT64:
685         ss << ((const int64_t*)subarray)[i];
686         break;
687       case Datatype::UINT64:
688         ss << ((const uint64_t*)subarray)[i];
689         break;
690       case Datatype::FLOAT32:
691         ss << ((const float*)subarray)[i];
692         break;
693       case Datatype::FLOAT64:
694         ss << ((const double*)subarray)[i];
695         break;
696       default:
697         return LOG_STATUS(Status::RestError(
698             "Error converting subarray to string; unhandled datatype."));
699     }
700 
701     if (i < subarray_nelts - 1)
702       ss << ",";
703   }
704 
705   *subarray_str = ss.str();
706 
707   return Status::Ok();
708 }
709 
update_attribute_buffer_sizes(const serialization::CopyState & copy_state,Query * query) const710 Status RestClient::update_attribute_buffer_sizes(
711     const serialization::CopyState& copy_state, Query* query) const {
712   // Applicable only to reads
713   if (query->type() != QueryType::READ)
714     return Status::Ok();
715 
716   for (const auto& cit : copy_state) {
717     const auto& name = cit.first;
718     auto state = cit.second;
719     auto query_buffer = query->buffer(name);
720     if (query_buffer.buffer_var_size_ != nullptr) {
721       *query_buffer.buffer_var_size_ = state.data_size;
722       *query_buffer.buffer_size_ = state.offset_size;
723     } else if (query_buffer.buffer_size_ != nullptr)
724       *query_buffer.buffer_size_ = state.data_size;
725   }
726 
727   return Status::Ok();
728 }
729 
get_query_est_result_sizes(const URI & uri,Query * query)730 Status RestClient::get_query_est_result_sizes(const URI& uri, Query* query) {
731   if (query == nullptr)
732     return LOG_STATUS(Status::RestError(
733         "Error getting query estimated result size from REST; Query is null."));
734 
735   // Get array
736   const Array* array = query->array();
737   if (array == nullptr) {
738     return LOG_STATUS(Status::RestError(
739         "Error festing query estimated result size from REST; null array."));
740   }
741 
742   // Serialize query to send
743   BufferList serialized;
744   RETURN_NOT_OK(serialization::query_serialize(
745       query, serialization_type_, true, &serialized));
746 
747   // Init curl and form the URL
748   Curl curlc;
749   std::string array_ns, array_uri;
750   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
751   const std::string cache_key = array_ns + ":" + array_uri;
752   RETURN_NOT_OK(
753       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
754   std::string url =
755       redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
756       curlc.url_escape(array_uri) +
757       "/query/est_result_sizes?type=" + query_type_str(query->type());
758 
759   // Remote array reads always supply the timestamp.
760   if (query->type() == QueryType::READ) {
761     url += "&start_timestamp=" + std::to_string(array->timestamp_start());
762     url += "&end_timestamp=" + std::to_string(array->timestamp_end());
763   }
764 
765   // Get the data
766   Buffer returned_data;
767   RETURN_NOT_OK(curlc.post_data(
768       stats_,
769       url,
770       serialization_type_,
771       &serialized,
772       &returned_data,
773       cache_key));
774   if (returned_data.data() == nullptr || returned_data.size() == 0)
775     return LOG_STATUS(Status::RestError(
776         "Error getting array metadata from REST; server returned no data."));
777 
778   return serialization::query_est_result_size_deserialize(
779       query, serialization_type_, true, returned_data);
780 }
781 
redirect_uri(const std::string & cache_key)782 std::string RestClient::redirect_uri(const std::string& cache_key) {
783   std::unique_lock<std::mutex> rd_lck(redirect_mtx_);
784   std::unordered_map<std::string, std::string>::const_iterator cache_it =
785       redirect_meta_.find(cache_key);
786 
787   return (cache_it == redirect_meta_.end()) ? rest_server_ : cache_it->second;
788 }
789 
post_array_schema_evolution_to_rest(const URI & uri,ArraySchemaEvolution * array_schema_evolution)790 Status RestClient::post_array_schema_evolution_to_rest(
791     const URI& uri, ArraySchemaEvolution* array_schema_evolution) {
792   Buffer buff;
793   RETURN_NOT_OK(serialization::array_schema_evolution_serialize(
794       array_schema_evolution, serialization_type_, &buff, false));
795   // Wrap in a list
796   BufferList serialized;
797   RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
798 
799   // Init curl and form the URL
800   Curl curlc;
801   std::string array_ns, array_uri;
802   RETURN_NOT_OK(uri.get_rest_components(&array_ns, &array_uri));
803   const std::string cache_key = array_ns + ":" + array_uri;
804   RETURN_NOT_OK(
805       curlc.init(config_, extra_headers_, &redirect_meta_, &redirect_mtx_));
806   auto deduced_url = redirect_uri(cache_key) + "/v1/arrays/" + array_ns + "/" +
807                      curlc.url_escape(array_uri) + "/evolve";
808   Buffer returned_data;
809   const Status sc = curlc.post_data(
810       stats_,
811       deduced_url,
812       serialization_type_,
813       &serialized,
814       &returned_data,
815       cache_key);
816   return sc;
817 }
818 
819 #else
820 
821 RestClient::RestClient() {
822   (void)config_;
823   (void)rest_server_;
824   (void)serialization_type_;
825 }
826 
827 Status RestClient::init(stats::Stats*, const Config*, ThreadPool*) {
828   return LOG_STATUS(
829       Status::RestError("Cannot use rest client; serialization not enabled."));
830 }
831 
832 Status RestClient::set_header(const std::string&, const std::string&) {
833   return LOG_STATUS(
834       Status::RestError("Cannot use rest client; serialization not enabled."));
835 }
836 
837 Status RestClient::get_array_schema_from_rest(const URI&, ArraySchema**) {
838   return LOG_STATUS(
839       Status::RestError("Cannot use rest client; serialization not enabled."));
840 }
841 
842 Status RestClient::post_array_schema_to_rest(const URI&, ArraySchema*) {
843   return LOG_STATUS(
844       Status::RestError("Cannot use rest client; serialization not enabled."));
845 }
846 
847 Status RestClient::deregister_array_from_rest(const URI&) {
848   return LOG_STATUS(
849       Status::RestError("Cannot use rest client; serialization not enabled."));
850 }
851 
852 Status RestClient::get_array_non_empty_domain(Array*, uint64_t, uint64_t) {
853   return LOG_STATUS(
854       Status::RestError("Cannot use rest client; serialization not enabled."));
855 }
856 
857 Status RestClient::get_array_max_buffer_sizes(
858     const URI&,
859     const ArraySchema*,
860     const void*,
861     std::unordered_map<std::string, std::pair<uint64_t, uint64_t>>*) {
862   return LOG_STATUS(
863       Status::RestError("Cannot use rest client; serialization not enabled."));
864 }
865 
866 Status RestClient::get_array_metadata_from_rest(
867     const URI&, uint64_t, uint64_t, Array*) {
868   return LOG_STATUS(
869       Status::RestError("Cannot use rest client; serialization not enabled."));
870 }
871 
872 Status RestClient::post_array_metadata_to_rest(
873     const URI&, uint64_t, uint64_t, Array*) {
874   return LOG_STATUS(
875       Status::RestError("Cannot use rest client; serialization not enabled."));
876 }
877 
878 Status RestClient::submit_query_to_rest(const URI&, Query*) {
879   return LOG_STATUS(
880       Status::RestError("Cannot use rest client; serialization not enabled."));
881 }
882 
883 Status RestClient::finalize_query_to_rest(const URI&, Query*) {
884   return LOG_STATUS(
885       Status::RestError("Cannot use rest client; serialization not enabled."));
886 }
887 
888 Status RestClient::get_query_est_result_sizes(const URI&, Query*) {
889   return LOG_STATUS(
890       Status::RestError("Cannot use rest client; serialization not enabled."));
891 }
892 
893 Status RestClient::post_array_schema_evolution_to_rest(
894     const URI&, ArraySchemaEvolution*) {
895   return LOG_STATUS(
896       Status::RestError("Cannot use rest client; serialization not enabled."));
897 }
898 
899 #endif  // TILEDB_SERIALIZATION
900 
901 }  // namespace sm
902 }  // namespace tiledb
903