1 /**
2  * @file   azure.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file implements the Azure class.
31  */
32 
33 #ifdef HAVE_AZURE
34 
35 #if !defined(NOMINMAX)
36 #define NOMINMAX  // avoid min/max macros from windows headers
37 #endif
38 
39 #include <put_block_list_request_base.h>
40 #include <future>
41 
42 #include "tiledb/common/logger.h"
43 #include "tiledb/sm/filesystem/azure.h"
44 #include "tiledb/sm/global_state/global_state.h"
45 #include "tiledb/sm/misc/utils.h"
46 
47 using namespace tiledb::common;
48 
49 namespace tiledb {
50 namespace sm {
51 
52 /* ********************************* */
53 /*     CONSTRUCTORS & DESTRUCTORS    */
54 /* ********************************* */
55 
Azure()56 Azure::Azure()
57     : write_cache_max_size_(0)
58     , max_parallel_ops_(1)
59     , block_list_block_size_(0)
60     , use_block_list_upload_(false) {
61 }
62 
~Azure()63 Azure::~Azure() {
64 }
65 
66 /* ********************************* */
67 /*                 API               */
68 /* ********************************* */
69 
init(const Config & config,ThreadPool * const thread_pool)70 Status Azure::init(const Config& config, ThreadPool* const thread_pool) {
71   if (thread_pool == nullptr) {
72     return LOG_STATUS(
73         Status::AzureError("Can't initialize with null thread pool."));
74   }
75 
76   thread_pool_ = thread_pool;
77 
78   bool found;
79   char* tmp = NULL;
80 
81   std::string account_name =
82       config.get("vfs.azure.storage_account_name", &found);
83   assert(found);
84   if (account_name.empty() &&
85       ((tmp = getenv("AZURE_STORAGE_ACCOUNT")) != NULL)) {
86     account_name = std::string(tmp);
87   }
88 
89   std::string account_key = config.get("vfs.azure.storage_account_key", &found);
90   assert(found);
91   if (account_key.empty() && ((tmp = getenv("AZURE_STORAGE_KEY")) != NULL)) {
92     account_key = std::string(getenv("AZURE_STORAGE_KEY"));
93   }
94 
95   std::string sas_token = config.get("vfs.azure.storage_sas_token", &found);
96   assert(found);
97   if (sas_token.empty() &&
98       ((tmp = getenv("AZURE_STORAGE_SAS_TOKEN")) != NULL)) {
99     sas_token = std::string(getenv("AZURE_STORAGE_SAS_TOKEN"));
100   }
101 
102   std::string blob_endpoint = config.get("vfs.azure.blob_endpoint", &found);
103   assert(found);
104   if (blob_endpoint.empty() &&
105       ((tmp = getenv("AZURE_BLOB_ENDPOINT")) != NULL)) {
106     blob_endpoint = std::string(getenv("AZURE_BLOB_ENDPOINT"));
107   }
108 
109   bool use_https;
110   RETURN_NOT_OK(config.get<bool>("vfs.azure.use_https", &use_https, &found));
111   assert(found);
112   // note that the default here is use_https=true, so the logic below is
113   // inverted
114   if (use_https && ((tmp = getenv("AZURE_USE_HTTPS")) != NULL)) {
115     std::string use_https_env(tmp);
116     use_https = (!use_https_env.empty());
117   }
118 
119   RETURN_NOT_OK(config.get<uint64_t>(
120       "vfs.azure.max_parallel_ops", &max_parallel_ops_, &found));
121   assert(found);
122   RETURN_NOT_OK(config.get<uint64_t>(
123       "vfs.azure.block_list_block_size", &block_list_block_size_, &found));
124   assert(found);
125   RETURN_NOT_OK(config.get<bool>(
126       "vfs.azure.use_block_list_upload", &use_block_list_upload_, &found));
127   assert(found);
128 
129   write_cache_max_size_ = max_parallel_ops_ * block_list_block_size_;
130 
131   // Initialize a credential object
132   std::shared_ptr<azure::storage_lite::storage_credential> credential =
133       std::make_shared<azure::storage_lite::shared_key_credential>(
134           account_name, account_key);
135 
136   // Use the SAS token, if provided.
137   if (!sas_token.empty()) {
138     // clang-format off
139     credential = std::make_shared<azure::storage_lite::shared_access_signature_credential>(sas_token);
140     // clang-format on
141   }
142 
143   std::shared_ptr<azure::storage_lite::storage_account> account =
144       std::make_shared<azure::storage_lite::storage_account>(
145           account_name, credential, use_https, blob_endpoint);
146 
147   // Construct the Azure SDK blob client with a concurrency level
148   // equal to 'thread_pool_->concurrency_level'. Internally, the client
149   // will allocate an equal number of libcurl sessions with
150   // 'curl_easy_init'. This ensures that our 'thread_pool_' threads
151   // will not block on the blob client's internal request queue
152   // unless the user is performing concurrent I/O on this instance.
153 #ifdef __linux__
154   // Get CA Cert bundle file from global state. This is initialized and cached
155   // if detected. We have only had issues with finding the certificate path on
156   // Linux.
157   const std::string cert_file =
158       global_state::GlobalState::GetGlobalState().cert_file();
159   client_ = tdb_make_shared(
160       azure::storage_lite::blob_client,
161       account,
162       thread_pool_->concurrency_level(),
163       cert_file);
164 #else
165   client_ = tdb_make_shared(
166       azure::storage_lite::blob_client,
167       account,
168       thread_pool_->concurrency_level());
169 #endif
170 
171   // The Azure SDK does not provide a way to configure the retry
172   // policy or construct a client with our own retry policy. This
173   // re-assigns the context with our own retry policy.
174   *client_->context() = azure::storage_lite::executor_context(
175       std::make_shared<azure::storage_lite::tinyxml2_parser>(),
176       std::make_shared<AzureRetryPolicy>());
177 
178   return Status::Ok();
179 }
180 
create_container(const URI & uri) const181 Status Azure::create_container(const URI& uri) const {
182   assert(client_);
183 
184   if (!uri.is_azure()) {
185     return LOG_STATUS(Status::AzureError(
186         std::string("URI is not an Azure URI: " + uri.to_string())));
187   }
188 
189   std::string container_name;
190   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
191 
192   std::future<azure::storage_lite::storage_outcome<void>> result =
193       client_->create_container(container_name);
194   if (!result.valid()) {
195     return LOG_STATUS(Status::AzureError(
196         std::string("Create container failed on: " + uri.to_string())));
197   }
198 
199   azure::storage_lite::storage_outcome<void> outcome = result.get();
200   if (!outcome.success()) {
201     return LOG_STATUS(Status::AzureError(
202         std::string("Create container failed on: " + uri.to_string())));
203   }
204 
205   return wait_for_container_to_propagate(container_name);
206 }
207 
wait_for_container_to_propagate(const std::string & container_name) const208 Status Azure::wait_for_container_to_propagate(
209     const std::string& container_name) const {
210   unsigned attempts = 0;
211   while (attempts++ < constants::azure_max_attempts) {
212     bool is_container;
213     RETURN_NOT_OK(this->is_container(container_name, &is_container));
214 
215     if (is_container) {
216       return Status::Ok();
217     }
218 
219     std::this_thread::sleep_for(
220         std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
221   }
222 
223   return LOG_STATUS(Status::AzureError(std::string(
224       "Timed out waiting on container to propogate: " + container_name)));
225 }
226 
wait_for_container_to_be_deleted(const std::string & container_name) const227 Status Azure::wait_for_container_to_be_deleted(
228     const std::string& container_name) const {
229   assert(client_);
230 
231   unsigned attempts = 0;
232   while (attempts++ < constants::azure_max_attempts) {
233     bool is_container;
234     RETURN_NOT_OK(this->is_container(container_name, &is_container));
235     if (!is_container) {
236       return Status::Ok();
237     }
238 
239     std::this_thread::sleep_for(
240         std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
241   }
242 
243   return LOG_STATUS(Status::AzureError(std::string(
244       "Timed out waiting on container to be deleted: " + container_name)));
245 }
246 
empty_container(const URI & container) const247 Status Azure::empty_container(const URI& container) const {
248   assert(client_);
249 
250   return remove_dir(container);
251 }
252 
flush_blob(const URI & uri)253 Status Azure::flush_blob(const URI& uri) {
254   assert(client_);
255 
256   if (!use_block_list_upload_) {
257     return flush_blob_direct(uri);
258   }
259 
260   if (!uri.is_azure()) {
261     return LOG_STATUS(Status::AzureError(
262         std::string("URI is not an Azure URI: " + uri.to_string())));
263   }
264 
265   Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string());
266 
267   const Status flush_write_cache_st =
268       flush_write_cache(uri, write_cache_buffer, true);
269 
270   std::unique_lock<std::mutex> states_lock(block_list_upload_states_lock_);
271 
272   if (block_list_upload_states_.count(uri.to_string()) == 0) {
273     return flush_write_cache_st;
274   }
275 
276   BlockListUploadState* const state =
277       &block_list_upload_states_.at(uri.to_string());
278 
279   states_lock.unlock();
280 
281   std::string container_name;
282   std::string blob_path;
283   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
284 
285   if (!state->st().ok()) {
286     // Save the return status because 'state' will be freed before we return.
287     const Status st = state->st();
288 
289     // Unlike S3 that can abort a chunked upload to immediately release
290     // uncommited chunks and leave the original object unmodified, the
291     // only way to do this on Azure is by some form of a write. We must
292     // either:
293     // 1. Delete the blob
294     // 2. Overwrite the blob with a zero-length buffer.
295     //
296     // Alternatively, we could do nothing and let Azure release the
297     // uncommited blocks ~7 days later. We chose to delete the blob
298     // as a best-effort operation. We intentionally are ignoring the
299     // returned Status from 'remove_blob'.
300     remove_blob(uri);
301 
302     // Release all instance state associated with this block list
303     // transactions.
304     finish_block_list_upload(uri);
305 
306     return st;
307   }
308 
309   // Build the block list to commit.
310   const std::list<std::string> block_ids = state->get_block_ids();
311   std::vector<azure::storage_lite::put_block_list_request_base::block_item>
312       block_list;
313   block_list.reserve(block_ids.size());
314   for (const auto& block_id : state->get_block_ids()) {
315     azure::storage_lite::put_block_list_request_base::block_item block;
316     block.id = block_id;
317     block.type = azure::storage_lite::put_block_list_request_base::block_type::
318         uncommitted;
319     block_list.emplace_back(block);
320   }
321 
322   // We do not store any custom metadata with the blob.
323   std::vector<std::pair<std::string, std::string>> empty_metadata;
324 
325   // Release all instance state associated with this block list
326   // transactions so that we can safely return if the following
327   // request failed.
328   finish_block_list_upload(uri);
329 
330   std::future<azure::storage_lite::storage_outcome<void>> result =
331       client_->put_block_list(
332           container_name, blob_path, block_list, empty_metadata);
333   if (!result.valid()) {
334     return LOG_STATUS(Status::AzureError(
335         std::string("Flush blob failed on: " + uri.to_string())));
336   }
337 
338   azure::storage_lite::storage_outcome<void> outcome = result.get();
339   if (!outcome.success()) {
340     return LOG_STATUS(Status::AzureError(
341         std::string("Flush blob failed on: " + uri.to_string())));
342   }
343 
344   return wait_for_blob_to_propagate(container_name, blob_path);
345 }
346 
finish_block_list_upload(const URI & uri)347 void Azure::finish_block_list_upload(const URI& uri) {
348   // Protect 'block_list_upload_states_' from multiple writers.
349   std::unique_lock<std::mutex> states_lock(block_list_upload_states_lock_);
350   block_list_upload_states_.erase(uri.to_string());
351   states_lock.unlock();
352 
353   // Protect 'write_cache_map_' from multiple writers.
354   std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
355   write_cache_map_.erase(uri.to_string());
356   cache_lock.unlock();
357 }
358 
flush_blob_direct(const URI & uri)359 Status Azure::flush_blob_direct(const URI& uri) {
360   if (!uri.is_azure()) {
361     return LOG_STATUS(Status::AzureError(
362         std::string("URI is not an Azure URI: " + uri.to_string())));
363   }
364 
365   Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string());
366 
367   if (write_cache_buffer->size() == 0) {
368     return Status::Ok();
369   }
370 
371   std::string container_name;
372   std::string blob_path;
373   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
374 
375   // We do not store any custom metadata with the blob.
376   std::vector<std::pair<std::string, std::string>> empty_metadata;
377 
378   // Unlike the 'upload_block_from_buffer' interface used in
379   // the block list upload path, there is not an interface to
380   // upload a single blob with a buffer. There is only
381   // 'upload_block_blob_from_stream'. Here, we construct a
382   // zero-copy stream buffer.
383   ZeroCopyStreamBuffer zc_stream_buffer(
384       static_cast<char*>(write_cache_buffer->data()),
385       write_cache_buffer->size());
386   std::istream zc_istream(&zc_stream_buffer);
387 
388   std::future<azure::storage_lite::storage_outcome<void>> result =
389       client_->upload_block_blob_from_stream(
390           container_name,
391           blob_path,
392           zc_istream,
393           empty_metadata,
394           write_cache_buffer->size());
395   if (!result.valid()) {
396     return LOG_STATUS(Status::AzureError(
397         std::string("Flush blob failed on: " + uri.to_string())));
398   }
399 
400   azure::storage_lite::storage_outcome<void> outcome = result.get();
401   if (!outcome.success()) {
402     return LOG_STATUS(Status::AzureError(
403         std::string("Flush blob failed on: " + uri.to_string())));
404   }
405 
406   // Protect 'write_cache_map_' from multiple writers.
407   std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
408   write_cache_map_.erase(uri.to_string());
409   cache_lock.unlock();
410 
411   return wait_for_blob_to_propagate(container_name, blob_path);
412 }
413 
is_empty_container(const URI & uri,bool * is_empty) const414 Status Azure::is_empty_container(const URI& uri, bool* is_empty) const {
415   assert(client_);
416   assert(is_empty);
417 
418   if (!uri.is_azure()) {
419     return LOG_STATUS(Status::AzureError(
420         std::string("URI is not an Azure URI: " + uri.to_string())));
421   }
422 
423   std::string container_name;
424   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
425 
426   std::future<azure::storage_lite::storage_outcome<
427       azure::storage_lite::list_blobs_segmented_response>>
428       result = client_->list_blobs_segmented(container_name, "", "", "", 1);
429   if (!result.valid()) {
430     return LOG_STATUS(Status::AzureError(
431         std::string("List blobs failed on: " + uri.to_string())));
432   }
433 
434   azure::storage_lite::storage_outcome<
435       azure::storage_lite::list_blobs_segmented_response>
436       outcome = result.get();
437   if (!outcome.success()) {
438     return LOG_STATUS(Status::AzureError(
439         std::string("List blobs failed on: " + uri.to_string())));
440   }
441 
442   azure::storage_lite::list_blobs_segmented_response response =
443       outcome.response();
444 
445   *is_empty = response.blobs.empty();
446 
447   return Status::Ok();
448 }
449 
is_container(const URI & uri,bool * const is_container) const450 Status Azure::is_container(const URI& uri, bool* const is_container) const {
451   assert(is_container);
452 
453   if (!uri.is_azure()) {
454     return LOG_STATUS(Status::AzureError(
455         std::string("URI is not an Azure URI: " + uri.to_string())));
456   }
457 
458   std::string container_name;
459   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
460 
461   return this->is_container(container_name, is_container);
462 }
463 
is_container(const std::string & container_name,bool * const is_container) const464 Status Azure::is_container(
465     const std::string& container_name, bool* const is_container) const {
466   assert(client_);
467   assert(is_container);
468 
469   std::future<azure::storage_lite::storage_outcome<
470       azure::storage_lite::container_property>>
471       result = client_->get_container_properties(container_name);
472   if (!result.valid()) {
473     return LOG_STATUS(Status::AzureError(
474         std::string("Get container properties failed on: " + container_name)));
475   }
476 
477   azure::storage_lite::storage_outcome<azure::storage_lite::container_property>
478       outcome = result.get();
479   if (!outcome.success()) {
480     *is_container = false;
481     return Status::Ok();
482   }
483 
484   azure::storage_lite::container_property response = outcome.response();
485 
486   *is_container = response.valid();
487   return Status::Ok();
488 }
489 
is_dir(const URI & uri,bool * const exists) const490 Status Azure::is_dir(const URI& uri, bool* const exists) const {
491   assert(client_);
492   assert(exists);
493 
494   std::vector<std::string> paths;
495   RETURN_NOT_OK(ls(uri, &paths, "/", 1));
496   *exists = (bool)paths.size();
497   return Status::Ok();
498 }
499 
is_blob(const URI & uri,bool * const is_blob) const500 Status Azure::is_blob(const URI& uri, bool* const is_blob) const {
501   assert(is_blob);
502 
503   std::string container_name;
504   std::string blob_path;
505   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
506 
507   return this->is_blob(container_name, blob_path, is_blob);
508 }
509 
is_blob(const std::string & container_name,const std::string & blob_path,bool * const is_blob) const510 Status Azure::is_blob(
511     const std::string& container_name,
512     const std::string& blob_path,
513     bool* const is_blob) const {
514   assert(client_);
515   assert(is_blob);
516 
517   std::future<
518       azure::storage_lite::storage_outcome<azure::storage_lite::blob_property>>
519       result = client_->get_blob_properties(container_name, blob_path);
520   if (!result.valid()) {
521     return LOG_STATUS(Status::AzureError(
522         std::string("Get blob properties failed on: " + blob_path)));
523   }
524 
525   azure::storage_lite::storage_outcome<azure::storage_lite::blob_property>
526       outcome = result.get();
527   if (!outcome.success()) {
528     *is_blob = false;
529     return Status::Ok();
530   }
531 
532   azure::storage_lite::blob_property response = outcome.response();
533 
534   *is_blob = response.valid();
535   return Status::Ok();
536 }
537 
remove_front_slash(const std::string & path) const538 std::string Azure::remove_front_slash(const std::string& path) const {
539   if (path.front() == '/') {
540     return path.substr(1, path.length());
541   }
542 
543   return path;
544 }
545 
add_trailing_slash(const std::string & path) const546 std::string Azure::add_trailing_slash(const std::string& path) const {
547   if (path.back() != '/') {
548     return path + "/";
549   }
550 
551   return path;
552 }
553 
remove_trailing_slash(const std::string & path) const554 std::string Azure::remove_trailing_slash(const std::string& path) const {
555   if (path.back() == '/') {
556     return path.substr(0, path.length() - 1);
557   }
558 
559   return path;
560 }
561 
ls(const URI & uri,std::vector<std::string> * paths,const std::string & delimiter,const int max_paths) const562 Status Azure::ls(
563     const URI& uri,
564     std::vector<std::string>* paths,
565     const std::string& delimiter,
566     const int max_paths) const {
567   assert(client_);
568   assert(paths);
569 
570   const URI uri_dir = uri.add_trailing_slash();
571 
572   if (!uri_dir.is_azure()) {
573     return LOG_STATUS(Status::AzureError(
574         std::string("URI is not an Azure URI: " + uri_dir.to_string())));
575   }
576 
577   std::string container_name;
578   std::string blob_path;
579   RETURN_NOT_OK(parse_azure_uri(uri_dir, &container_name, &blob_path));
580 
581   std::string continuation_token = "";
582   do {
583     std::future<azure::storage_lite::storage_outcome<
584         azure::storage_lite::list_blobs_segmented_response>>
585         result = client_->list_blobs_segmented(
586             container_name,
587             delimiter,
588             continuation_token,
589             blob_path,
590             max_paths > 0 ? max_paths : 5000);
591     if (!result.valid()) {
592       return LOG_STATUS(Status::AzureError(
593           std::string("List blobs failed on: " + uri_dir.to_string())));
594     }
595 
596     azure::storage_lite::storage_outcome<
597         azure::storage_lite::list_blobs_segmented_response>
598         outcome = result.get();
599     if (!outcome.success()) {
600       return LOG_STATUS(Status::AzureError(
601           std::string("List blobs failed on: " + uri_dir.to_string())));
602     }
603 
604     azure::storage_lite::list_blobs_segmented_response response =
605         outcome.response();
606 
607     for (const auto& blob : response.blobs) {
608       paths->emplace_back(
609           "azure://" + container_name + "/" +
610           remove_front_slash(remove_trailing_slash(blob.name)));
611     }
612 
613     continuation_token = response.next_marker;
614   } while (!continuation_token.empty());
615 
616   return Status::Ok();
617 }
618 
move_object(const URI & old_uri,const URI & new_uri)619 Status Azure::move_object(const URI& old_uri, const URI& new_uri) {
620   assert(client_);
621   RETURN_NOT_OK(copy_blob(old_uri, new_uri));
622   RETURN_NOT_OK(remove_blob(old_uri));
623   return Status::Ok();
624 }
625 
copy_blob(const URI & old_uri,const URI & new_uri)626 Status Azure::copy_blob(const URI& old_uri, const URI& new_uri) {
627   assert(client_);
628 
629   if (!old_uri.is_azure()) {
630     return LOG_STATUS(Status::AzureError(
631         std::string("URI is not an Azure URI: " + old_uri.to_string())));
632   }
633 
634   if (!new_uri.is_azure()) {
635     return LOG_STATUS(Status::AzureError(
636         std::string("URI is not an Azure URI: " + new_uri.to_string())));
637   }
638 
639   std::string old_container_name;
640   std::string old_blob_path;
641   RETURN_NOT_OK(parse_azure_uri(old_uri, &old_container_name, &old_blob_path));
642 
643   std::string new_container_name;
644   std::string new_blob_path;
645   RETURN_NOT_OK(parse_azure_uri(new_uri, &new_container_name, &new_blob_path));
646 
647   std::future<azure::storage_lite::storage_outcome<void>> result =
648       client_->start_copy(
649           old_container_name, old_blob_path, new_container_name, new_blob_path);
650   if (!result.valid()) {
651     return LOG_STATUS(Status::AzureError(
652         std::string("Copy blob failed on: " + old_uri.to_string())));
653   }
654 
655   azure::storage_lite::storage_outcome<void> outcome = result.get();
656   if (!outcome.success()) {
657     return LOG_STATUS(Status::AzureError(
658         std::string("Copy blob failed on: " + old_uri.to_string())));
659   }
660 
661   return wait_for_blob_to_propagate(new_container_name, new_blob_path);
662 }
663 
wait_for_blob_to_propagate(const std::string & container_name,const std::string & blob_path) const664 Status Azure::wait_for_blob_to_propagate(
665     const std::string& container_name, const std::string& blob_path) const {
666   assert(client_);
667 
668   unsigned attempts = 0;
669   while (attempts++ < constants::azure_max_attempts) {
670     bool is_blob;
671     RETURN_NOT_OK(this->is_blob(container_name, blob_path, &is_blob));
672     if (is_blob) {
673       return Status::Ok();
674     }
675 
676     std::this_thread::sleep_for(
677         std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
678   }
679 
680   return LOG_STATUS(Status::AzureError(
681       std::string("Timed out waiting on blob to propogate: " + blob_path)));
682 }
683 
wait_for_blob_to_be_deleted(const std::string & container_name,const std::string & blob_path) const684 Status Azure::wait_for_blob_to_be_deleted(
685     const std::string& container_name, const std::string& blob_path) const {
686   assert(client_);
687 
688   unsigned attempts = 0;
689   while (attempts++ < constants::azure_max_attempts) {
690     bool is_blob;
691     RETURN_NOT_OK(this->is_blob(container_name, blob_path, &is_blob));
692     if (!is_blob) {
693       return Status::Ok();
694     }
695 
696     std::this_thread::sleep_for(
697         std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
698   }
699 
700   return LOG_STATUS(Status::AzureError(
701       std::string("Timed out waiting on blob to be deleted: " + blob_path)));
702 }
703 
move_dir(const URI & old_uri,const URI & new_uri)704 Status Azure::move_dir(const URI& old_uri, const URI& new_uri) {
705   assert(client_);
706 
707   std::vector<std::string> paths;
708   RETURN_NOT_OK(ls(old_uri, &paths, ""));
709   for (const auto& path : paths) {
710     const std::string suffix = path.substr(old_uri.to_string().size());
711     const URI new_path = new_uri.join_path(suffix);
712     RETURN_NOT_OK(move_object(URI(path), new_path));
713   }
714   return Status::Ok();
715 }
716 
blob_size(const URI & uri,uint64_t * const nbytes) const717 Status Azure::blob_size(const URI& uri, uint64_t* const nbytes) const {
718   assert(client_);
719   assert(nbytes);
720 
721   if (!uri.is_azure()) {
722     return LOG_STATUS(Status::AzureError(
723         std::string("URI is not an Azure URI: " + uri.to_string())));
724   }
725 
726   std::string container_name;
727   std::string blob_path;
728   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
729 
730   std::future<azure::storage_lite::storage_outcome<
731       azure::storage_lite::list_blobs_segmented_response>>
732       result =
733           client_->list_blobs_segmented(container_name, "", "", blob_path, 1);
734   if (!result.valid()) {
735     return LOG_STATUS(Status::AzureError(
736         std::string("Get blob size failed on: " + uri.to_string())));
737   }
738 
739   azure::storage_lite::storage_outcome<
740       azure::storage_lite::list_blobs_segmented_response>
741       outcome = result.get();
742   if (!outcome.success()) {
743     return LOG_STATUS(Status::AzureError(
744         std::string("Get blob size failed on: " + uri.to_string())));
745   }
746 
747   azure::storage_lite::list_blobs_segmented_response response =
748       outcome.response();
749 
750   if (response.blobs.empty()) {
751     return LOG_STATUS(Status::AzureError(
752         std::string("Get blob size failed on: " + uri.to_string())));
753   }
754 
755   const azure::storage_lite::list_blobs_segmented_item& blob =
756       response.blobs[0];
757 
758   *nbytes = blob.content_length;
759 
760   return Status::Ok();
761 }
762 
read(const URI & uri,const off_t offset,void * const buffer,const uint64_t length,const uint64_t read_ahead_length,uint64_t * const length_returned) const763 Status Azure::read(
764     const URI& uri,
765     const off_t offset,
766     void* const buffer,
767     const uint64_t length,
768     const uint64_t read_ahead_length,
769     uint64_t* const length_returned) const {
770   assert(client_);
771 
772   if (!uri.is_azure()) {
773     return LOG_STATUS(Status::AzureError(
774         std::string("URI is not an Azure URI: " + uri.to_string())));
775   }
776 
777   std::string container_name;
778   std::string blob_path;
779   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
780 
781   std::stringstream ss;
782   std::future<azure::storage_lite::storage_outcome<void>> result =
783       client_->download_blob_to_stream(
784           container_name, blob_path, offset, length + read_ahead_length, ss);
785   if (!result.valid()) {
786     return LOG_STATUS(Status::AzureError(
787         std::string("Read blob failed on: " + uri.to_string())));
788   }
789 
790   azure::storage_lite::storage_outcome<void> outcome = result.get();
791   if (!outcome.success()) {
792     return LOG_STATUS(Status::AzureError(
793         std::string("Read blob failed on: " + uri.to_string())));
794   }
795 
796   ss.read(static_cast<char*>(buffer), length + read_ahead_length);
797   *length_returned = ss.gcount();
798 
799   if (*length_returned < length) {
800     return LOG_STATUS(Status::AzureError(
801         std::string("Read operation read unexpected number of bytes.")));
802   }
803 
804   return Status::Ok();
805 }
806 
remove_container(const URI & uri) const807 Status Azure::remove_container(const URI& uri) const {
808   assert(client_);
809 
810   // Empty container
811   RETURN_NOT_OK(empty_container(uri));
812 
813   std::string container_name;
814   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
815 
816   std::future<azure::storage_lite::storage_outcome<void>> result =
817       client_->delete_container(container_name);
818   if (!result.valid()) {
819     return LOG_STATUS(Status::AzureError(
820         std::string("Remove container failed on: " + uri.to_string())));
821   }
822 
823   azure::storage_lite::storage_outcome<void> outcome = result.get();
824   if (!outcome.success()) {
825     return LOG_STATUS(Status::AzureError(
826         std::string("Remove container failed on: " + uri.to_string())));
827   }
828 
829   return wait_for_container_to_be_deleted(container_name);
830 }
831 
remove_blob(const URI & uri) const832 Status Azure::remove_blob(const URI& uri) const {
833   assert(client_);
834 
835   std::string container_name;
836   std::string blob_path;
837   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
838 
839   std::future<azure::storage_lite::storage_outcome<void>> result =
840       client_->delete_blob(
841           container_name, blob_path, false /* delete_snapshots */);
842   if (!result.valid()) {
843     return LOG_STATUS(Status::AzureError(
844         std::string("Remove blob failed on: " + uri.to_string())));
845   }
846 
847   azure::storage_lite::storage_outcome<void> outcome = result.get();
848   if (!outcome.success()) {
849     return LOG_STATUS(Status::AzureError(
850         std::string("Remove blob failed on: " + uri.to_string())));
851   }
852 
853   return wait_for_blob_to_be_deleted(container_name, blob_path);
854 }
855 
remove_dir(const URI & uri) const856 Status Azure::remove_dir(const URI& uri) const {
857   assert(client_);
858 
859   std::vector<std::string> paths;
860   RETURN_NOT_OK(ls(uri, &paths, ""));
861   for (const auto& path : paths) {
862     RETURN_NOT_OK(remove_blob(URI(path)));
863   }
864 
865   return Status::Ok();
866 }
867 
touch(const URI & uri) const868 Status Azure::touch(const URI& uri) const {
869   assert(client_);
870 
871   if (!uri.is_azure()) {
872     return LOG_STATUS(Status::AzureError(
873         std::string("URI is not an Azure URI: " + uri.to_string())));
874   }
875 
876   if (uri.to_string().back() == '/') {
877     return LOG_STATUS(Status::AzureError(std::string(
878         "Cannot create file; URI is a directory: " + uri.to_string())));
879   }
880 
881   bool is_blob;
882   RETURN_NOT_OK(this->is_blob(uri, &is_blob));
883   if (is_blob) {
884     return Status::Ok();
885   }
886 
887   std::string container_name;
888   std::string blob_path;
889   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
890 
891   std::stringstream empty_ss;
892   std::vector<std::pair<std::string, std::string>> empty_metadata;
893   std::future<azure::storage_lite::storage_outcome<void>> result =
894       client_->upload_block_blob_from_stream(
895           container_name, blob_path, empty_ss, empty_metadata);
896   if (!result.valid()) {
897     return LOG_STATUS(Status::AzureError(
898         std::string("Touch blob failed on: " + uri.to_string())));
899   }
900 
901   azure::storage_lite::storage_outcome<void> outcome = result.get();
902   if (!outcome.success()) {
903     return LOG_STATUS(Status::AzureError(
904         std::string("Touch blob failed on: " + uri.to_string())));
905   }
906 
907   return Status::Ok();
908 }
909 
write(const URI & uri,const void * const buffer,const uint64_t length)910 Status Azure::write(
911     const URI& uri, const void* const buffer, const uint64_t length) {
912   if (!uri.is_azure()) {
913     return LOG_STATUS(Status::AzureError(
914         std::string("URI is not an Azure URI: " + uri.to_string())));
915   }
916 
917   Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string());
918 
919   uint64_t nbytes_filled;
920   RETURN_NOT_OK(
921       fill_write_cache(write_cache_buffer, buffer, length, &nbytes_filled));
922 
923   if (!use_block_list_upload_) {
924     if (nbytes_filled != length) {
925       std::stringstream errmsg;
926       errmsg << "Direct write failed! " << nbytes_filled
927              << " bytes written to buffer, " << length << " bytes requested.";
928       return LOG_STATUS(Status::AzureError(errmsg.str()));
929     } else {
930       return Status::Ok();
931     }
932   }
933 
934   if (write_cache_buffer->size() == write_cache_max_size_) {
935     RETURN_NOT_OK(flush_write_cache(uri, write_cache_buffer, false));
936   }
937 
938   uint64_t new_length = length - nbytes_filled;
939   uint64_t offset = nbytes_filled;
940   while (new_length > 0) {
941     if (new_length >= write_cache_max_size_) {
942       RETURN_NOT_OK(write_blocks(
943           uri,
944           static_cast<const char*>(buffer) + offset,
945           write_cache_max_size_,
946           false));
947       offset += write_cache_max_size_;
948       new_length -= write_cache_max_size_;
949     } else {
950       RETURN_NOT_OK(fill_write_cache(
951           write_cache_buffer,
952           static_cast<const char*>(buffer) + offset,
953           new_length,
954           &nbytes_filled));
955       offset += nbytes_filled;
956       new_length -= nbytes_filled;
957     }
958   }
959 
960   assert(offset == length);
961 
962   return Status::Ok();
963 }
964 
get_write_cache_buffer(const std::string & uri)965 Buffer* Azure::get_write_cache_buffer(const std::string& uri) {
966   std::unique_lock<std::mutex> map_lock(write_cache_map_lock_);
967   if (write_cache_map_.count(uri) > 0) {
968     return &write_cache_map_.at(uri);
969   } else {
970     return &write_cache_map_[uri];
971   }
972 }
973 
fill_write_cache(Buffer * const write_cache_buffer,const void * const buffer,const uint64_t length,uint64_t * const nbytes_filled)974 Status Azure::fill_write_cache(
975     Buffer* const write_cache_buffer,
976     const void* const buffer,
977     const uint64_t length,
978     uint64_t* const nbytes_filled) {
979   assert(write_cache_buffer);
980   assert(buffer);
981   assert(nbytes_filled);
982 
983   *nbytes_filled =
984       std::min(write_cache_max_size_ - write_cache_buffer->size(), length);
985 
986   if (*nbytes_filled > 0) {
987     RETURN_NOT_OK(write_cache_buffer->write(buffer, *nbytes_filled));
988   }
989 
990   return Status::Ok();
991 }
992 
flush_write_cache(const URI & uri,Buffer * const write_cache_buffer,const bool last_block)993 Status Azure::flush_write_cache(
994     const URI& uri, Buffer* const write_cache_buffer, const bool last_block) {
995   assert(write_cache_buffer);
996 
997   if (write_cache_buffer->size() > 0) {
998     const Status st = write_blocks(
999         uri,
1000         write_cache_buffer->data(),
1001         write_cache_buffer->size(),
1002         last_block);
1003     write_cache_buffer->reset_size();
1004     RETURN_NOT_OK(st);
1005   }
1006 
1007   return Status::Ok();
1008 }
1009 
write_blocks(const URI & uri,const void * const buffer,const uint64_t length,const bool last_block)1010 Status Azure::write_blocks(
1011     const URI& uri,
1012     const void* const buffer,
1013     const uint64_t length,
1014     const bool last_block) {
1015   if (!uri.is_azure()) {
1016     return LOG_STATUS(Status::AzureError(
1017         std::string("URI is not an Azure URI: " + uri.to_string())));
1018   }
1019 
1020   // Ensure that each thread is responsible for exactly block_list_block_size_
1021   // bytes (except if this is the last block, in which case the final
1022   // thread should write less). Cap the number of parallel operations at the
1023   // configured max number. Length must be evenly divisible by
1024   // block_list_block_size_ unless this is the last block.
1025   uint64_t num_ops = last_block ?
1026                          utils::math::ceil(length, block_list_block_size_) :
1027                          (length / block_list_block_size_);
1028   num_ops = std::min(std::max(num_ops, uint64_t(1)), max_parallel_ops_);
1029 
1030   if (!last_block && length % block_list_block_size_ != 0) {
1031     return LOG_STATUS(
1032         Status::AzureError("Length not evenly divisible by block size"));
1033   }
1034 
1035   // Protect 'block_list_upload_states_' from concurrent read and writes.
1036   std::unique_lock<std::mutex> states_lock(block_list_upload_states_lock_);
1037 
1038   auto state_iter = block_list_upload_states_.find(uri.to_string());
1039   if (state_iter == block_list_upload_states_.end()) {
1040     // Delete file if it exists (overwrite).
1041     bool exists;
1042     RETURN_NOT_OK(is_blob(uri, &exists));
1043     if (exists) {
1044       RETURN_NOT_OK(remove_blob(uri));
1045     }
1046 
1047     // Instantiate the new state.
1048     BlockListUploadState state;
1049 
1050     // Store the new state.
1051     const std::pair<
1052         std::unordered_map<std::string, BlockListUploadState>::iterator,
1053         bool>
1054         emplaced = block_list_upload_states_.emplace(
1055             uri.to_string(), std::move(state));
1056     assert(emplaced.second);
1057     state_iter = emplaced.first;
1058   }
1059 
1060   BlockListUploadState* const state = &state_iter->second;
1061 
1062   // We're done reading and writing from 'block_list_upload_states_'. Mutating
1063   // the 'state' element does not affect the thread-safety of
1064   // 'block_list_upload_states_'.
1065   states_lock.unlock();
1066 
1067   std::string container_name;
1068   std::string blob_path;
1069   RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
1070 
1071   if (num_ops == 1) {
1072     const std::string block_id = state->next_block_id();
1073 
1074     const Status st =
1075         upload_block(container_name, blob_path, buffer, length, block_id);
1076     state->update_st(st);
1077     return st;
1078   } else {
1079     std::vector<ThreadPool::Task> tasks;
1080     tasks.reserve(num_ops);
1081     for (uint64_t i = 0; i < num_ops; i++) {
1082       const uint64_t begin = i * block_list_block_size_;
1083       const uint64_t end =
1084           std::min((i + 1) * block_list_block_size_ - 1, length - 1);
1085       const char* const thread_buffer =
1086           reinterpret_cast<const char*>(buffer) + begin;
1087       const uint64_t thread_buffer_len = end - begin + 1;
1088       const std::string block_id = state->next_block_id();
1089 
1090       std::function<Status()> upload_block_fn = std::bind(
1091           &Azure::upload_block,
1092           this,
1093           container_name,
1094           blob_path,
1095           thread_buffer,
1096           thread_buffer_len,
1097           block_id);
1098       ThreadPool::Task task = thread_pool_->execute(std::move(upload_block_fn));
1099       tasks.emplace_back(std::move(task));
1100     }
1101 
1102     const Status st = thread_pool_->wait_all(tasks);
1103     state->update_st(st);
1104     return st;
1105   }
1106 
1107   return Status::Ok();
1108 }
1109 
upload_block(const std::string & container_name,const std::string & blob_path,const void * const buffer,const uint64_t length,const std::string & block_id)1110 Status Azure::upload_block(
1111     const std::string& container_name,
1112     const std::string& blob_path,
1113     const void* const buffer,
1114     const uint64_t length,
1115     const std::string& block_id) {
1116   // The 'const_cast' is necessary because the SDK API requires a
1117   // non-const 'buffer'. However, this is safe because the SDK does
1118   // not actually mutate 'buffer'.
1119   //
1120   // This may removed once the following PR is merged and released:
1121   // https://github.com/Azure/azure-storage-cpplite/pull/64
1122   std::future<azure::storage_lite::storage_outcome<void>> result =
1123       client_->upload_block_from_buffer(
1124           container_name,
1125           blob_path,
1126           block_id,
1127           const_cast<char*>(static_cast<const char*>(buffer)),
1128           length);
1129 
1130   if (!result.valid()) {
1131     return LOG_STATUS(Status::AzureError(
1132         std::string("Upload block failed on: " + blob_path)));
1133   }
1134 
1135   azure::storage_lite::storage_outcome<void> outcome = result.get();
1136   if (!outcome.success()) {
1137     return LOG_STATUS(Status::AzureError(
1138         std::string("Upload block failed on: " + blob_path)));
1139   }
1140 
1141   return Status::Ok();
1142 }
1143 
parse_azure_uri(const URI & uri,std::string * const container_name,std::string * const blob_path) const1144 Status Azure::parse_azure_uri(
1145     const URI& uri,
1146     std::string* const container_name,
1147     std::string* const blob_path) const {
1148   assert(uri.is_azure());
1149   const std::string uri_str = uri.to_string();
1150 
1151   const static std::string azure_prefix = "azure://";
1152   assert(uri_str.rfind(azure_prefix, 0) == 0);
1153 
1154   if (uri_str.size() == azure_prefix.size()) {
1155     if (container_name)
1156       *container_name = "";
1157     if (blob_path)
1158       *blob_path = "";
1159     return Status::Ok();
1160   }
1161 
1162   // Find the '/' after the container name.
1163   const size_t separator = uri_str.find('/', azure_prefix.size() + 1);
1164 
1165   // There is only a container name if there isn't a separating slash.
1166   if (separator == std::string::npos) {
1167     const size_t c_pos_start = azure_prefix.size();
1168     const size_t c_pos_end = uri_str.size();
1169     if (container_name)
1170       *container_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start);
1171     if (blob_path)
1172       *blob_path = "";
1173     return Status::Ok();
1174   }
1175 
1176   // There is only a container name if there aren't any characters past the
1177   // separating slash.
1178   if (uri_str.size() == separator) {
1179     const size_t c_pos_start = azure_prefix.size();
1180     const size_t c_pos_end = separator;
1181     if (container_name)
1182       *container_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start);
1183     if (blob_path)
1184       *blob_path = "";
1185     return Status::Ok();
1186   }
1187 
1188   const size_t c_pos_start = azure_prefix.size();
1189   const size_t c_pos_end = separator;
1190   const size_t b_pos_start = separator + 1;
1191   const size_t b_pos_end = uri_str.size();
1192 
1193   if (container_name)
1194     *container_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start);
1195   if (blob_path)
1196     *blob_path = uri_str.substr(b_pos_start, b_pos_end - b_pos_start);
1197 
1198   return Status::Ok();
1199 }
1200 
1201 }  // namespace sm
1202 }  // namespace tiledb
1203 
1204 #endif
1205