1 /**
2 * @file azure.cc
3 *
4 * @section LICENSE
5 *
6 * The MIT License
7 *
8 * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9 *
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
16 *
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
27 *
28 * @section DESCRIPTION
29 *
30 * This file implements the Azure class.
31 */
32
33 #ifdef HAVE_AZURE
34
35 #if !defined(NOMINMAX)
36 #define NOMINMAX // avoid min/max macros from windows headers
37 #endif
38
39 #include <put_block_list_request_base.h>
40 #include <future>
41
42 #include "tiledb/common/logger.h"
43 #include "tiledb/sm/filesystem/azure.h"
44 #include "tiledb/sm/global_state/global_state.h"
45 #include "tiledb/sm/misc/utils.h"
46
47 using namespace tiledb::common;
48
49 namespace tiledb {
50 namespace sm {
51
52 /* ********************************* */
53 /* CONSTRUCTORS & DESTRUCTORS */
54 /* ********************************* */
55
Azure()56 Azure::Azure()
57 : write_cache_max_size_(0)
58 , max_parallel_ops_(1)
59 , block_list_block_size_(0)
60 , use_block_list_upload_(false) {
61 }
62
~Azure()63 Azure::~Azure() {
64 }
65
66 /* ********************************* */
67 /* API */
68 /* ********************************* */
69
init(const Config & config,ThreadPool * const thread_pool)70 Status Azure::init(const Config& config, ThreadPool* const thread_pool) {
71 if (thread_pool == nullptr) {
72 return LOG_STATUS(
73 Status::AzureError("Can't initialize with null thread pool."));
74 }
75
76 thread_pool_ = thread_pool;
77
78 bool found;
79 char* tmp = NULL;
80
81 std::string account_name =
82 config.get("vfs.azure.storage_account_name", &found);
83 assert(found);
84 if (account_name.empty() &&
85 ((tmp = getenv("AZURE_STORAGE_ACCOUNT")) != NULL)) {
86 account_name = std::string(tmp);
87 }
88
89 std::string account_key = config.get("vfs.azure.storage_account_key", &found);
90 assert(found);
91 if (account_key.empty() && ((tmp = getenv("AZURE_STORAGE_KEY")) != NULL)) {
92 account_key = std::string(getenv("AZURE_STORAGE_KEY"));
93 }
94
95 std::string sas_token = config.get("vfs.azure.storage_sas_token", &found);
96 assert(found);
97 if (sas_token.empty() &&
98 ((tmp = getenv("AZURE_STORAGE_SAS_TOKEN")) != NULL)) {
99 sas_token = std::string(getenv("AZURE_STORAGE_SAS_TOKEN"));
100 }
101
102 std::string blob_endpoint = config.get("vfs.azure.blob_endpoint", &found);
103 assert(found);
104 if (blob_endpoint.empty() &&
105 ((tmp = getenv("AZURE_BLOB_ENDPOINT")) != NULL)) {
106 blob_endpoint = std::string(getenv("AZURE_BLOB_ENDPOINT"));
107 }
108
109 bool use_https;
110 RETURN_NOT_OK(config.get<bool>("vfs.azure.use_https", &use_https, &found));
111 assert(found);
112 // note that the default here is use_https=true, so the logic below is
113 // inverted
114 if (use_https && ((tmp = getenv("AZURE_USE_HTTPS")) != NULL)) {
115 std::string use_https_env(tmp);
116 use_https = (!use_https_env.empty());
117 }
118
119 RETURN_NOT_OK(config.get<uint64_t>(
120 "vfs.azure.max_parallel_ops", &max_parallel_ops_, &found));
121 assert(found);
122 RETURN_NOT_OK(config.get<uint64_t>(
123 "vfs.azure.block_list_block_size", &block_list_block_size_, &found));
124 assert(found);
125 RETURN_NOT_OK(config.get<bool>(
126 "vfs.azure.use_block_list_upload", &use_block_list_upload_, &found));
127 assert(found);
128
129 write_cache_max_size_ = max_parallel_ops_ * block_list_block_size_;
130
131 // Initialize a credential object
132 std::shared_ptr<azure::storage_lite::storage_credential> credential =
133 std::make_shared<azure::storage_lite::shared_key_credential>(
134 account_name, account_key);
135
136 // Use the SAS token, if provided.
137 if (!sas_token.empty()) {
138 // clang-format off
139 credential = std::make_shared<azure::storage_lite::shared_access_signature_credential>(sas_token);
140 // clang-format on
141 }
142
143 std::shared_ptr<azure::storage_lite::storage_account> account =
144 std::make_shared<azure::storage_lite::storage_account>(
145 account_name, credential, use_https, blob_endpoint);
146
147 // Construct the Azure SDK blob client with a concurrency level
148 // equal to 'thread_pool_->concurrency_level'. Internally, the client
149 // will allocate an equal number of libcurl sessions with
150 // 'curl_easy_init'. This ensures that our 'thread_pool_' threads
151 // will not block on the blob client's internal request queue
152 // unless the user is performing concurrent I/O on this instance.
153 #ifdef __linux__
154 // Get CA Cert bundle file from global state. This is initialized and cached
155 // if detected. We have only had issues with finding the certificate path on
156 // Linux.
157 const std::string cert_file =
158 global_state::GlobalState::GetGlobalState().cert_file();
159 client_ = tdb_make_shared(
160 azure::storage_lite::blob_client,
161 account,
162 thread_pool_->concurrency_level(),
163 cert_file);
164 #else
165 client_ = tdb_make_shared(
166 azure::storage_lite::blob_client,
167 account,
168 thread_pool_->concurrency_level());
169 #endif
170
171 // The Azure SDK does not provide a way to configure the retry
172 // policy or construct a client with our own retry policy. This
173 // re-assigns the context with our own retry policy.
174 *client_->context() = azure::storage_lite::executor_context(
175 std::make_shared<azure::storage_lite::tinyxml2_parser>(),
176 std::make_shared<AzureRetryPolicy>());
177
178 return Status::Ok();
179 }
180
create_container(const URI & uri) const181 Status Azure::create_container(const URI& uri) const {
182 assert(client_);
183
184 if (!uri.is_azure()) {
185 return LOG_STATUS(Status::AzureError(
186 std::string("URI is not an Azure URI: " + uri.to_string())));
187 }
188
189 std::string container_name;
190 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
191
192 std::future<azure::storage_lite::storage_outcome<void>> result =
193 client_->create_container(container_name);
194 if (!result.valid()) {
195 return LOG_STATUS(Status::AzureError(
196 std::string("Create container failed on: " + uri.to_string())));
197 }
198
199 azure::storage_lite::storage_outcome<void> outcome = result.get();
200 if (!outcome.success()) {
201 return LOG_STATUS(Status::AzureError(
202 std::string("Create container failed on: " + uri.to_string())));
203 }
204
205 return wait_for_container_to_propagate(container_name);
206 }
207
wait_for_container_to_propagate(const std::string & container_name) const208 Status Azure::wait_for_container_to_propagate(
209 const std::string& container_name) const {
210 unsigned attempts = 0;
211 while (attempts++ < constants::azure_max_attempts) {
212 bool is_container;
213 RETURN_NOT_OK(this->is_container(container_name, &is_container));
214
215 if (is_container) {
216 return Status::Ok();
217 }
218
219 std::this_thread::sleep_for(
220 std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
221 }
222
223 return LOG_STATUS(Status::AzureError(std::string(
224 "Timed out waiting on container to propogate: " + container_name)));
225 }
226
wait_for_container_to_be_deleted(const std::string & container_name) const227 Status Azure::wait_for_container_to_be_deleted(
228 const std::string& container_name) const {
229 assert(client_);
230
231 unsigned attempts = 0;
232 while (attempts++ < constants::azure_max_attempts) {
233 bool is_container;
234 RETURN_NOT_OK(this->is_container(container_name, &is_container));
235 if (!is_container) {
236 return Status::Ok();
237 }
238
239 std::this_thread::sleep_for(
240 std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
241 }
242
243 return LOG_STATUS(Status::AzureError(std::string(
244 "Timed out waiting on container to be deleted: " + container_name)));
245 }
246
empty_container(const URI & container) const247 Status Azure::empty_container(const URI& container) const {
248 assert(client_);
249
250 return remove_dir(container);
251 }
252
flush_blob(const URI & uri)253 Status Azure::flush_blob(const URI& uri) {
254 assert(client_);
255
256 if (!use_block_list_upload_) {
257 return flush_blob_direct(uri);
258 }
259
260 if (!uri.is_azure()) {
261 return LOG_STATUS(Status::AzureError(
262 std::string("URI is not an Azure URI: " + uri.to_string())));
263 }
264
265 Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string());
266
267 const Status flush_write_cache_st =
268 flush_write_cache(uri, write_cache_buffer, true);
269
270 std::unique_lock<std::mutex> states_lock(block_list_upload_states_lock_);
271
272 if (block_list_upload_states_.count(uri.to_string()) == 0) {
273 return flush_write_cache_st;
274 }
275
276 BlockListUploadState* const state =
277 &block_list_upload_states_.at(uri.to_string());
278
279 states_lock.unlock();
280
281 std::string container_name;
282 std::string blob_path;
283 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
284
285 if (!state->st().ok()) {
286 // Save the return status because 'state' will be freed before we return.
287 const Status st = state->st();
288
289 // Unlike S3 that can abort a chunked upload to immediately release
290 // uncommited chunks and leave the original object unmodified, the
291 // only way to do this on Azure is by some form of a write. We must
292 // either:
293 // 1. Delete the blob
294 // 2. Overwrite the blob with a zero-length buffer.
295 //
296 // Alternatively, we could do nothing and let Azure release the
297 // uncommited blocks ~7 days later. We chose to delete the blob
298 // as a best-effort operation. We intentionally are ignoring the
299 // returned Status from 'remove_blob'.
300 remove_blob(uri);
301
302 // Release all instance state associated with this block list
303 // transactions.
304 finish_block_list_upload(uri);
305
306 return st;
307 }
308
309 // Build the block list to commit.
310 const std::list<std::string> block_ids = state->get_block_ids();
311 std::vector<azure::storage_lite::put_block_list_request_base::block_item>
312 block_list;
313 block_list.reserve(block_ids.size());
314 for (const auto& block_id : state->get_block_ids()) {
315 azure::storage_lite::put_block_list_request_base::block_item block;
316 block.id = block_id;
317 block.type = azure::storage_lite::put_block_list_request_base::block_type::
318 uncommitted;
319 block_list.emplace_back(block);
320 }
321
322 // We do not store any custom metadata with the blob.
323 std::vector<std::pair<std::string, std::string>> empty_metadata;
324
325 // Release all instance state associated with this block list
326 // transactions so that we can safely return if the following
327 // request failed.
328 finish_block_list_upload(uri);
329
330 std::future<azure::storage_lite::storage_outcome<void>> result =
331 client_->put_block_list(
332 container_name, blob_path, block_list, empty_metadata);
333 if (!result.valid()) {
334 return LOG_STATUS(Status::AzureError(
335 std::string("Flush blob failed on: " + uri.to_string())));
336 }
337
338 azure::storage_lite::storage_outcome<void> outcome = result.get();
339 if (!outcome.success()) {
340 return LOG_STATUS(Status::AzureError(
341 std::string("Flush blob failed on: " + uri.to_string())));
342 }
343
344 return wait_for_blob_to_propagate(container_name, blob_path);
345 }
346
finish_block_list_upload(const URI & uri)347 void Azure::finish_block_list_upload(const URI& uri) {
348 // Protect 'block_list_upload_states_' from multiple writers.
349 std::unique_lock<std::mutex> states_lock(block_list_upload_states_lock_);
350 block_list_upload_states_.erase(uri.to_string());
351 states_lock.unlock();
352
353 // Protect 'write_cache_map_' from multiple writers.
354 std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
355 write_cache_map_.erase(uri.to_string());
356 cache_lock.unlock();
357 }
358
flush_blob_direct(const URI & uri)359 Status Azure::flush_blob_direct(const URI& uri) {
360 if (!uri.is_azure()) {
361 return LOG_STATUS(Status::AzureError(
362 std::string("URI is not an Azure URI: " + uri.to_string())));
363 }
364
365 Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string());
366
367 if (write_cache_buffer->size() == 0) {
368 return Status::Ok();
369 }
370
371 std::string container_name;
372 std::string blob_path;
373 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
374
375 // We do not store any custom metadata with the blob.
376 std::vector<std::pair<std::string, std::string>> empty_metadata;
377
378 // Unlike the 'upload_block_from_buffer' interface used in
379 // the block list upload path, there is not an interface to
380 // upload a single blob with a buffer. There is only
381 // 'upload_block_blob_from_stream'. Here, we construct a
382 // zero-copy stream buffer.
383 ZeroCopyStreamBuffer zc_stream_buffer(
384 static_cast<char*>(write_cache_buffer->data()),
385 write_cache_buffer->size());
386 std::istream zc_istream(&zc_stream_buffer);
387
388 std::future<azure::storage_lite::storage_outcome<void>> result =
389 client_->upload_block_blob_from_stream(
390 container_name,
391 blob_path,
392 zc_istream,
393 empty_metadata,
394 write_cache_buffer->size());
395 if (!result.valid()) {
396 return LOG_STATUS(Status::AzureError(
397 std::string("Flush blob failed on: " + uri.to_string())));
398 }
399
400 azure::storage_lite::storage_outcome<void> outcome = result.get();
401 if (!outcome.success()) {
402 return LOG_STATUS(Status::AzureError(
403 std::string("Flush blob failed on: " + uri.to_string())));
404 }
405
406 // Protect 'write_cache_map_' from multiple writers.
407 std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
408 write_cache_map_.erase(uri.to_string());
409 cache_lock.unlock();
410
411 return wait_for_blob_to_propagate(container_name, blob_path);
412 }
413
is_empty_container(const URI & uri,bool * is_empty) const414 Status Azure::is_empty_container(const URI& uri, bool* is_empty) const {
415 assert(client_);
416 assert(is_empty);
417
418 if (!uri.is_azure()) {
419 return LOG_STATUS(Status::AzureError(
420 std::string("URI is not an Azure URI: " + uri.to_string())));
421 }
422
423 std::string container_name;
424 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
425
426 std::future<azure::storage_lite::storage_outcome<
427 azure::storage_lite::list_blobs_segmented_response>>
428 result = client_->list_blobs_segmented(container_name, "", "", "", 1);
429 if (!result.valid()) {
430 return LOG_STATUS(Status::AzureError(
431 std::string("List blobs failed on: " + uri.to_string())));
432 }
433
434 azure::storage_lite::storage_outcome<
435 azure::storage_lite::list_blobs_segmented_response>
436 outcome = result.get();
437 if (!outcome.success()) {
438 return LOG_STATUS(Status::AzureError(
439 std::string("List blobs failed on: " + uri.to_string())));
440 }
441
442 azure::storage_lite::list_blobs_segmented_response response =
443 outcome.response();
444
445 *is_empty = response.blobs.empty();
446
447 return Status::Ok();
448 }
449
is_container(const URI & uri,bool * const is_container) const450 Status Azure::is_container(const URI& uri, bool* const is_container) const {
451 assert(is_container);
452
453 if (!uri.is_azure()) {
454 return LOG_STATUS(Status::AzureError(
455 std::string("URI is not an Azure URI: " + uri.to_string())));
456 }
457
458 std::string container_name;
459 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
460
461 return this->is_container(container_name, is_container);
462 }
463
is_container(const std::string & container_name,bool * const is_container) const464 Status Azure::is_container(
465 const std::string& container_name, bool* const is_container) const {
466 assert(client_);
467 assert(is_container);
468
469 std::future<azure::storage_lite::storage_outcome<
470 azure::storage_lite::container_property>>
471 result = client_->get_container_properties(container_name);
472 if (!result.valid()) {
473 return LOG_STATUS(Status::AzureError(
474 std::string("Get container properties failed on: " + container_name)));
475 }
476
477 azure::storage_lite::storage_outcome<azure::storage_lite::container_property>
478 outcome = result.get();
479 if (!outcome.success()) {
480 *is_container = false;
481 return Status::Ok();
482 }
483
484 azure::storage_lite::container_property response = outcome.response();
485
486 *is_container = response.valid();
487 return Status::Ok();
488 }
489
is_dir(const URI & uri,bool * const exists) const490 Status Azure::is_dir(const URI& uri, bool* const exists) const {
491 assert(client_);
492 assert(exists);
493
494 std::vector<std::string> paths;
495 RETURN_NOT_OK(ls(uri, &paths, "/", 1));
496 *exists = (bool)paths.size();
497 return Status::Ok();
498 }
499
is_blob(const URI & uri,bool * const is_blob) const500 Status Azure::is_blob(const URI& uri, bool* const is_blob) const {
501 assert(is_blob);
502
503 std::string container_name;
504 std::string blob_path;
505 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
506
507 return this->is_blob(container_name, blob_path, is_blob);
508 }
509
is_blob(const std::string & container_name,const std::string & blob_path,bool * const is_blob) const510 Status Azure::is_blob(
511 const std::string& container_name,
512 const std::string& blob_path,
513 bool* const is_blob) const {
514 assert(client_);
515 assert(is_blob);
516
517 std::future<
518 azure::storage_lite::storage_outcome<azure::storage_lite::blob_property>>
519 result = client_->get_blob_properties(container_name, blob_path);
520 if (!result.valid()) {
521 return LOG_STATUS(Status::AzureError(
522 std::string("Get blob properties failed on: " + blob_path)));
523 }
524
525 azure::storage_lite::storage_outcome<azure::storage_lite::blob_property>
526 outcome = result.get();
527 if (!outcome.success()) {
528 *is_blob = false;
529 return Status::Ok();
530 }
531
532 azure::storage_lite::blob_property response = outcome.response();
533
534 *is_blob = response.valid();
535 return Status::Ok();
536 }
537
remove_front_slash(const std::string & path) const538 std::string Azure::remove_front_slash(const std::string& path) const {
539 if (path.front() == '/') {
540 return path.substr(1, path.length());
541 }
542
543 return path;
544 }
545
add_trailing_slash(const std::string & path) const546 std::string Azure::add_trailing_slash(const std::string& path) const {
547 if (path.back() != '/') {
548 return path + "/";
549 }
550
551 return path;
552 }
553
remove_trailing_slash(const std::string & path) const554 std::string Azure::remove_trailing_slash(const std::string& path) const {
555 if (path.back() == '/') {
556 return path.substr(0, path.length() - 1);
557 }
558
559 return path;
560 }
561
ls(const URI & uri,std::vector<std::string> * paths,const std::string & delimiter,const int max_paths) const562 Status Azure::ls(
563 const URI& uri,
564 std::vector<std::string>* paths,
565 const std::string& delimiter,
566 const int max_paths) const {
567 assert(client_);
568 assert(paths);
569
570 const URI uri_dir = uri.add_trailing_slash();
571
572 if (!uri_dir.is_azure()) {
573 return LOG_STATUS(Status::AzureError(
574 std::string("URI is not an Azure URI: " + uri_dir.to_string())));
575 }
576
577 std::string container_name;
578 std::string blob_path;
579 RETURN_NOT_OK(parse_azure_uri(uri_dir, &container_name, &blob_path));
580
581 std::string continuation_token = "";
582 do {
583 std::future<azure::storage_lite::storage_outcome<
584 azure::storage_lite::list_blobs_segmented_response>>
585 result = client_->list_blobs_segmented(
586 container_name,
587 delimiter,
588 continuation_token,
589 blob_path,
590 max_paths > 0 ? max_paths : 5000);
591 if (!result.valid()) {
592 return LOG_STATUS(Status::AzureError(
593 std::string("List blobs failed on: " + uri_dir.to_string())));
594 }
595
596 azure::storage_lite::storage_outcome<
597 azure::storage_lite::list_blobs_segmented_response>
598 outcome = result.get();
599 if (!outcome.success()) {
600 return LOG_STATUS(Status::AzureError(
601 std::string("List blobs failed on: " + uri_dir.to_string())));
602 }
603
604 azure::storage_lite::list_blobs_segmented_response response =
605 outcome.response();
606
607 for (const auto& blob : response.blobs) {
608 paths->emplace_back(
609 "azure://" + container_name + "/" +
610 remove_front_slash(remove_trailing_slash(blob.name)));
611 }
612
613 continuation_token = response.next_marker;
614 } while (!continuation_token.empty());
615
616 return Status::Ok();
617 }
618
move_object(const URI & old_uri,const URI & new_uri)619 Status Azure::move_object(const URI& old_uri, const URI& new_uri) {
620 assert(client_);
621 RETURN_NOT_OK(copy_blob(old_uri, new_uri));
622 RETURN_NOT_OK(remove_blob(old_uri));
623 return Status::Ok();
624 }
625
copy_blob(const URI & old_uri,const URI & new_uri)626 Status Azure::copy_blob(const URI& old_uri, const URI& new_uri) {
627 assert(client_);
628
629 if (!old_uri.is_azure()) {
630 return LOG_STATUS(Status::AzureError(
631 std::string("URI is not an Azure URI: " + old_uri.to_string())));
632 }
633
634 if (!new_uri.is_azure()) {
635 return LOG_STATUS(Status::AzureError(
636 std::string("URI is not an Azure URI: " + new_uri.to_string())));
637 }
638
639 std::string old_container_name;
640 std::string old_blob_path;
641 RETURN_NOT_OK(parse_azure_uri(old_uri, &old_container_name, &old_blob_path));
642
643 std::string new_container_name;
644 std::string new_blob_path;
645 RETURN_NOT_OK(parse_azure_uri(new_uri, &new_container_name, &new_blob_path));
646
647 std::future<azure::storage_lite::storage_outcome<void>> result =
648 client_->start_copy(
649 old_container_name, old_blob_path, new_container_name, new_blob_path);
650 if (!result.valid()) {
651 return LOG_STATUS(Status::AzureError(
652 std::string("Copy blob failed on: " + old_uri.to_string())));
653 }
654
655 azure::storage_lite::storage_outcome<void> outcome = result.get();
656 if (!outcome.success()) {
657 return LOG_STATUS(Status::AzureError(
658 std::string("Copy blob failed on: " + old_uri.to_string())));
659 }
660
661 return wait_for_blob_to_propagate(new_container_name, new_blob_path);
662 }
663
wait_for_blob_to_propagate(const std::string & container_name,const std::string & blob_path) const664 Status Azure::wait_for_blob_to_propagate(
665 const std::string& container_name, const std::string& blob_path) const {
666 assert(client_);
667
668 unsigned attempts = 0;
669 while (attempts++ < constants::azure_max_attempts) {
670 bool is_blob;
671 RETURN_NOT_OK(this->is_blob(container_name, blob_path, &is_blob));
672 if (is_blob) {
673 return Status::Ok();
674 }
675
676 std::this_thread::sleep_for(
677 std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
678 }
679
680 return LOG_STATUS(Status::AzureError(
681 std::string("Timed out waiting on blob to propogate: " + blob_path)));
682 }
683
wait_for_blob_to_be_deleted(const std::string & container_name,const std::string & blob_path) const684 Status Azure::wait_for_blob_to_be_deleted(
685 const std::string& container_name, const std::string& blob_path) const {
686 assert(client_);
687
688 unsigned attempts = 0;
689 while (attempts++ < constants::azure_max_attempts) {
690 bool is_blob;
691 RETURN_NOT_OK(this->is_blob(container_name, blob_path, &is_blob));
692 if (!is_blob) {
693 return Status::Ok();
694 }
695
696 std::this_thread::sleep_for(
697 std::chrono::milliseconds(constants::azure_attempt_sleep_ms));
698 }
699
700 return LOG_STATUS(Status::AzureError(
701 std::string("Timed out waiting on blob to be deleted: " + blob_path)));
702 }
703
move_dir(const URI & old_uri,const URI & new_uri)704 Status Azure::move_dir(const URI& old_uri, const URI& new_uri) {
705 assert(client_);
706
707 std::vector<std::string> paths;
708 RETURN_NOT_OK(ls(old_uri, &paths, ""));
709 for (const auto& path : paths) {
710 const std::string suffix = path.substr(old_uri.to_string().size());
711 const URI new_path = new_uri.join_path(suffix);
712 RETURN_NOT_OK(move_object(URI(path), new_path));
713 }
714 return Status::Ok();
715 }
716
blob_size(const URI & uri,uint64_t * const nbytes) const717 Status Azure::blob_size(const URI& uri, uint64_t* const nbytes) const {
718 assert(client_);
719 assert(nbytes);
720
721 if (!uri.is_azure()) {
722 return LOG_STATUS(Status::AzureError(
723 std::string("URI is not an Azure URI: " + uri.to_string())));
724 }
725
726 std::string container_name;
727 std::string blob_path;
728 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
729
730 std::future<azure::storage_lite::storage_outcome<
731 azure::storage_lite::list_blobs_segmented_response>>
732 result =
733 client_->list_blobs_segmented(container_name, "", "", blob_path, 1);
734 if (!result.valid()) {
735 return LOG_STATUS(Status::AzureError(
736 std::string("Get blob size failed on: " + uri.to_string())));
737 }
738
739 azure::storage_lite::storage_outcome<
740 azure::storage_lite::list_blobs_segmented_response>
741 outcome = result.get();
742 if (!outcome.success()) {
743 return LOG_STATUS(Status::AzureError(
744 std::string("Get blob size failed on: " + uri.to_string())));
745 }
746
747 azure::storage_lite::list_blobs_segmented_response response =
748 outcome.response();
749
750 if (response.blobs.empty()) {
751 return LOG_STATUS(Status::AzureError(
752 std::string("Get blob size failed on: " + uri.to_string())));
753 }
754
755 const azure::storage_lite::list_blobs_segmented_item& blob =
756 response.blobs[0];
757
758 *nbytes = blob.content_length;
759
760 return Status::Ok();
761 }
762
read(const URI & uri,const off_t offset,void * const buffer,const uint64_t length,const uint64_t read_ahead_length,uint64_t * const length_returned) const763 Status Azure::read(
764 const URI& uri,
765 const off_t offset,
766 void* const buffer,
767 const uint64_t length,
768 const uint64_t read_ahead_length,
769 uint64_t* const length_returned) const {
770 assert(client_);
771
772 if (!uri.is_azure()) {
773 return LOG_STATUS(Status::AzureError(
774 std::string("URI is not an Azure URI: " + uri.to_string())));
775 }
776
777 std::string container_name;
778 std::string blob_path;
779 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
780
781 std::stringstream ss;
782 std::future<azure::storage_lite::storage_outcome<void>> result =
783 client_->download_blob_to_stream(
784 container_name, blob_path, offset, length + read_ahead_length, ss);
785 if (!result.valid()) {
786 return LOG_STATUS(Status::AzureError(
787 std::string("Read blob failed on: " + uri.to_string())));
788 }
789
790 azure::storage_lite::storage_outcome<void> outcome = result.get();
791 if (!outcome.success()) {
792 return LOG_STATUS(Status::AzureError(
793 std::string("Read blob failed on: " + uri.to_string())));
794 }
795
796 ss.read(static_cast<char*>(buffer), length + read_ahead_length);
797 *length_returned = ss.gcount();
798
799 if (*length_returned < length) {
800 return LOG_STATUS(Status::AzureError(
801 std::string("Read operation read unexpected number of bytes.")));
802 }
803
804 return Status::Ok();
805 }
806
remove_container(const URI & uri) const807 Status Azure::remove_container(const URI& uri) const {
808 assert(client_);
809
810 // Empty container
811 RETURN_NOT_OK(empty_container(uri));
812
813 std::string container_name;
814 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, nullptr));
815
816 std::future<azure::storage_lite::storage_outcome<void>> result =
817 client_->delete_container(container_name);
818 if (!result.valid()) {
819 return LOG_STATUS(Status::AzureError(
820 std::string("Remove container failed on: " + uri.to_string())));
821 }
822
823 azure::storage_lite::storage_outcome<void> outcome = result.get();
824 if (!outcome.success()) {
825 return LOG_STATUS(Status::AzureError(
826 std::string("Remove container failed on: " + uri.to_string())));
827 }
828
829 return wait_for_container_to_be_deleted(container_name);
830 }
831
remove_blob(const URI & uri) const832 Status Azure::remove_blob(const URI& uri) const {
833 assert(client_);
834
835 std::string container_name;
836 std::string blob_path;
837 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
838
839 std::future<azure::storage_lite::storage_outcome<void>> result =
840 client_->delete_blob(
841 container_name, blob_path, false /* delete_snapshots */);
842 if (!result.valid()) {
843 return LOG_STATUS(Status::AzureError(
844 std::string("Remove blob failed on: " + uri.to_string())));
845 }
846
847 azure::storage_lite::storage_outcome<void> outcome = result.get();
848 if (!outcome.success()) {
849 return LOG_STATUS(Status::AzureError(
850 std::string("Remove blob failed on: " + uri.to_string())));
851 }
852
853 return wait_for_blob_to_be_deleted(container_name, blob_path);
854 }
855
remove_dir(const URI & uri) const856 Status Azure::remove_dir(const URI& uri) const {
857 assert(client_);
858
859 std::vector<std::string> paths;
860 RETURN_NOT_OK(ls(uri, &paths, ""));
861 for (const auto& path : paths) {
862 RETURN_NOT_OK(remove_blob(URI(path)));
863 }
864
865 return Status::Ok();
866 }
867
touch(const URI & uri) const868 Status Azure::touch(const URI& uri) const {
869 assert(client_);
870
871 if (!uri.is_azure()) {
872 return LOG_STATUS(Status::AzureError(
873 std::string("URI is not an Azure URI: " + uri.to_string())));
874 }
875
876 if (uri.to_string().back() == '/') {
877 return LOG_STATUS(Status::AzureError(std::string(
878 "Cannot create file; URI is a directory: " + uri.to_string())));
879 }
880
881 bool is_blob;
882 RETURN_NOT_OK(this->is_blob(uri, &is_blob));
883 if (is_blob) {
884 return Status::Ok();
885 }
886
887 std::string container_name;
888 std::string blob_path;
889 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
890
891 std::stringstream empty_ss;
892 std::vector<std::pair<std::string, std::string>> empty_metadata;
893 std::future<azure::storage_lite::storage_outcome<void>> result =
894 client_->upload_block_blob_from_stream(
895 container_name, blob_path, empty_ss, empty_metadata);
896 if (!result.valid()) {
897 return LOG_STATUS(Status::AzureError(
898 std::string("Touch blob failed on: " + uri.to_string())));
899 }
900
901 azure::storage_lite::storage_outcome<void> outcome = result.get();
902 if (!outcome.success()) {
903 return LOG_STATUS(Status::AzureError(
904 std::string("Touch blob failed on: " + uri.to_string())));
905 }
906
907 return Status::Ok();
908 }
909
write(const URI & uri,const void * const buffer,const uint64_t length)910 Status Azure::write(
911 const URI& uri, const void* const buffer, const uint64_t length) {
912 if (!uri.is_azure()) {
913 return LOG_STATUS(Status::AzureError(
914 std::string("URI is not an Azure URI: " + uri.to_string())));
915 }
916
917 Buffer* const write_cache_buffer = get_write_cache_buffer(uri.to_string());
918
919 uint64_t nbytes_filled;
920 RETURN_NOT_OK(
921 fill_write_cache(write_cache_buffer, buffer, length, &nbytes_filled));
922
923 if (!use_block_list_upload_) {
924 if (nbytes_filled != length) {
925 std::stringstream errmsg;
926 errmsg << "Direct write failed! " << nbytes_filled
927 << " bytes written to buffer, " << length << " bytes requested.";
928 return LOG_STATUS(Status::AzureError(errmsg.str()));
929 } else {
930 return Status::Ok();
931 }
932 }
933
934 if (write_cache_buffer->size() == write_cache_max_size_) {
935 RETURN_NOT_OK(flush_write_cache(uri, write_cache_buffer, false));
936 }
937
938 uint64_t new_length = length - nbytes_filled;
939 uint64_t offset = nbytes_filled;
940 while (new_length > 0) {
941 if (new_length >= write_cache_max_size_) {
942 RETURN_NOT_OK(write_blocks(
943 uri,
944 static_cast<const char*>(buffer) + offset,
945 write_cache_max_size_,
946 false));
947 offset += write_cache_max_size_;
948 new_length -= write_cache_max_size_;
949 } else {
950 RETURN_NOT_OK(fill_write_cache(
951 write_cache_buffer,
952 static_cast<const char*>(buffer) + offset,
953 new_length,
954 &nbytes_filled));
955 offset += nbytes_filled;
956 new_length -= nbytes_filled;
957 }
958 }
959
960 assert(offset == length);
961
962 return Status::Ok();
963 }
964
get_write_cache_buffer(const std::string & uri)965 Buffer* Azure::get_write_cache_buffer(const std::string& uri) {
966 std::unique_lock<std::mutex> map_lock(write_cache_map_lock_);
967 if (write_cache_map_.count(uri) > 0) {
968 return &write_cache_map_.at(uri);
969 } else {
970 return &write_cache_map_[uri];
971 }
972 }
973
fill_write_cache(Buffer * const write_cache_buffer,const void * const buffer,const uint64_t length,uint64_t * const nbytes_filled)974 Status Azure::fill_write_cache(
975 Buffer* const write_cache_buffer,
976 const void* const buffer,
977 const uint64_t length,
978 uint64_t* const nbytes_filled) {
979 assert(write_cache_buffer);
980 assert(buffer);
981 assert(nbytes_filled);
982
983 *nbytes_filled =
984 std::min(write_cache_max_size_ - write_cache_buffer->size(), length);
985
986 if (*nbytes_filled > 0) {
987 RETURN_NOT_OK(write_cache_buffer->write(buffer, *nbytes_filled));
988 }
989
990 return Status::Ok();
991 }
992
flush_write_cache(const URI & uri,Buffer * const write_cache_buffer,const bool last_block)993 Status Azure::flush_write_cache(
994 const URI& uri, Buffer* const write_cache_buffer, const bool last_block) {
995 assert(write_cache_buffer);
996
997 if (write_cache_buffer->size() > 0) {
998 const Status st = write_blocks(
999 uri,
1000 write_cache_buffer->data(),
1001 write_cache_buffer->size(),
1002 last_block);
1003 write_cache_buffer->reset_size();
1004 RETURN_NOT_OK(st);
1005 }
1006
1007 return Status::Ok();
1008 }
1009
write_blocks(const URI & uri,const void * const buffer,const uint64_t length,const bool last_block)1010 Status Azure::write_blocks(
1011 const URI& uri,
1012 const void* const buffer,
1013 const uint64_t length,
1014 const bool last_block) {
1015 if (!uri.is_azure()) {
1016 return LOG_STATUS(Status::AzureError(
1017 std::string("URI is not an Azure URI: " + uri.to_string())));
1018 }
1019
1020 // Ensure that each thread is responsible for exactly block_list_block_size_
1021 // bytes (except if this is the last block, in which case the final
1022 // thread should write less). Cap the number of parallel operations at the
1023 // configured max number. Length must be evenly divisible by
1024 // block_list_block_size_ unless this is the last block.
1025 uint64_t num_ops = last_block ?
1026 utils::math::ceil(length, block_list_block_size_) :
1027 (length / block_list_block_size_);
1028 num_ops = std::min(std::max(num_ops, uint64_t(1)), max_parallel_ops_);
1029
1030 if (!last_block && length % block_list_block_size_ != 0) {
1031 return LOG_STATUS(
1032 Status::AzureError("Length not evenly divisible by block size"));
1033 }
1034
1035 // Protect 'block_list_upload_states_' from concurrent read and writes.
1036 std::unique_lock<std::mutex> states_lock(block_list_upload_states_lock_);
1037
1038 auto state_iter = block_list_upload_states_.find(uri.to_string());
1039 if (state_iter == block_list_upload_states_.end()) {
1040 // Delete file if it exists (overwrite).
1041 bool exists;
1042 RETURN_NOT_OK(is_blob(uri, &exists));
1043 if (exists) {
1044 RETURN_NOT_OK(remove_blob(uri));
1045 }
1046
1047 // Instantiate the new state.
1048 BlockListUploadState state;
1049
1050 // Store the new state.
1051 const std::pair<
1052 std::unordered_map<std::string, BlockListUploadState>::iterator,
1053 bool>
1054 emplaced = block_list_upload_states_.emplace(
1055 uri.to_string(), std::move(state));
1056 assert(emplaced.second);
1057 state_iter = emplaced.first;
1058 }
1059
1060 BlockListUploadState* const state = &state_iter->second;
1061
1062 // We're done reading and writing from 'block_list_upload_states_'. Mutating
1063 // the 'state' element does not affect the thread-safety of
1064 // 'block_list_upload_states_'.
1065 states_lock.unlock();
1066
1067 std::string container_name;
1068 std::string blob_path;
1069 RETURN_NOT_OK(parse_azure_uri(uri, &container_name, &blob_path));
1070
1071 if (num_ops == 1) {
1072 const std::string block_id = state->next_block_id();
1073
1074 const Status st =
1075 upload_block(container_name, blob_path, buffer, length, block_id);
1076 state->update_st(st);
1077 return st;
1078 } else {
1079 std::vector<ThreadPool::Task> tasks;
1080 tasks.reserve(num_ops);
1081 for (uint64_t i = 0; i < num_ops; i++) {
1082 const uint64_t begin = i * block_list_block_size_;
1083 const uint64_t end =
1084 std::min((i + 1) * block_list_block_size_ - 1, length - 1);
1085 const char* const thread_buffer =
1086 reinterpret_cast<const char*>(buffer) + begin;
1087 const uint64_t thread_buffer_len = end - begin + 1;
1088 const std::string block_id = state->next_block_id();
1089
1090 std::function<Status()> upload_block_fn = std::bind(
1091 &Azure::upload_block,
1092 this,
1093 container_name,
1094 blob_path,
1095 thread_buffer,
1096 thread_buffer_len,
1097 block_id);
1098 ThreadPool::Task task = thread_pool_->execute(std::move(upload_block_fn));
1099 tasks.emplace_back(std::move(task));
1100 }
1101
1102 const Status st = thread_pool_->wait_all(tasks);
1103 state->update_st(st);
1104 return st;
1105 }
1106
1107 return Status::Ok();
1108 }
1109
upload_block(const std::string & container_name,const std::string & blob_path,const void * const buffer,const uint64_t length,const std::string & block_id)1110 Status Azure::upload_block(
1111 const std::string& container_name,
1112 const std::string& blob_path,
1113 const void* const buffer,
1114 const uint64_t length,
1115 const std::string& block_id) {
1116 // The 'const_cast' is necessary because the SDK API requires a
1117 // non-const 'buffer'. However, this is safe because the SDK does
1118 // not actually mutate 'buffer'.
1119 //
1120 // This may removed once the following PR is merged and released:
1121 // https://github.com/Azure/azure-storage-cpplite/pull/64
1122 std::future<azure::storage_lite::storage_outcome<void>> result =
1123 client_->upload_block_from_buffer(
1124 container_name,
1125 blob_path,
1126 block_id,
1127 const_cast<char*>(static_cast<const char*>(buffer)),
1128 length);
1129
1130 if (!result.valid()) {
1131 return LOG_STATUS(Status::AzureError(
1132 std::string("Upload block failed on: " + blob_path)));
1133 }
1134
1135 azure::storage_lite::storage_outcome<void> outcome = result.get();
1136 if (!outcome.success()) {
1137 return LOG_STATUS(Status::AzureError(
1138 std::string("Upload block failed on: " + blob_path)));
1139 }
1140
1141 return Status::Ok();
1142 }
1143
parse_azure_uri(const URI & uri,std::string * const container_name,std::string * const blob_path) const1144 Status Azure::parse_azure_uri(
1145 const URI& uri,
1146 std::string* const container_name,
1147 std::string* const blob_path) const {
1148 assert(uri.is_azure());
1149 const std::string uri_str = uri.to_string();
1150
1151 const static std::string azure_prefix = "azure://";
1152 assert(uri_str.rfind(azure_prefix, 0) == 0);
1153
1154 if (uri_str.size() == azure_prefix.size()) {
1155 if (container_name)
1156 *container_name = "";
1157 if (blob_path)
1158 *blob_path = "";
1159 return Status::Ok();
1160 }
1161
1162 // Find the '/' after the container name.
1163 const size_t separator = uri_str.find('/', azure_prefix.size() + 1);
1164
1165 // There is only a container name if there isn't a separating slash.
1166 if (separator == std::string::npos) {
1167 const size_t c_pos_start = azure_prefix.size();
1168 const size_t c_pos_end = uri_str.size();
1169 if (container_name)
1170 *container_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start);
1171 if (blob_path)
1172 *blob_path = "";
1173 return Status::Ok();
1174 }
1175
1176 // There is only a container name if there aren't any characters past the
1177 // separating slash.
1178 if (uri_str.size() == separator) {
1179 const size_t c_pos_start = azure_prefix.size();
1180 const size_t c_pos_end = separator;
1181 if (container_name)
1182 *container_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start);
1183 if (blob_path)
1184 *blob_path = "";
1185 return Status::Ok();
1186 }
1187
1188 const size_t c_pos_start = azure_prefix.size();
1189 const size_t c_pos_end = separator;
1190 const size_t b_pos_start = separator + 1;
1191 const size_t b_pos_end = uri_str.size();
1192
1193 if (container_name)
1194 *container_name = uri_str.substr(c_pos_start, c_pos_end - c_pos_start);
1195 if (blob_path)
1196 *blob_path = uri_str.substr(b_pos_start, b_pos_end - b_pos_start);
1197
1198 return Status::Ok();
1199 }
1200
1201 } // namespace sm
1202 } // namespace tiledb
1203
1204 #endif
1205