1 /**
2  * @file   azure.h
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file defines the Azure class.
31  */
32 
33 #ifndef TILEDB_AZURE_H
34 #define TILEDB_AZURE_H
35 
36 #ifdef HAVE_AZURE
37 #include "tiledb/common/status.h"
38 #include "tiledb/common/thread_pool.h"
39 #include "tiledb/sm/buffer/buffer.h"
40 #include "tiledb/sm/config/config.h"
41 #include "tiledb/sm/misc/constants.h"
42 #include "tiledb/sm/misc/uri.h"
43 
44 #if !defined(NOMINMAX)
45 #define NOMINMAX  // avoid min/max macros from windows headers
46 #endif
47 #include <base64.h>
48 #include <blob/blob_client.h>
49 #include <retry.h>
50 #include <storage_account.h>
51 #include <storage_credential.h>
52 #include <list>
53 #include <unordered_map>
54 
55 // azure sdk transitively includes mmtime.h which defines TIME_MS
56 #ifdef TIME_MS
57 #undef TIME_MS
58 #endif
59 
60 using namespace tiledb::common;
61 
62 namespace tiledb {
63 namespace sm {
64 
65 class Azure {
66  public:
67   /* ********************************* */
68   /*     CONSTRUCTORS & DESTRUCTORS    */
69   /* ********************************* */
70 
71   /** Constructor. */
72   Azure();
73 
74   /** Destructor. */
75   ~Azure();
76 
77   /* ********************************* */
78   /*                 API               */
79   /* ********************************* */
80 
81   /**
82    * Initializes and connects an Azure client.
83    *
84    * @param config Configuration parameters.
85    * @param thread_pool The parent VFS thread pool.
86    * @return Status
87    */
88   Status init(const Config& config, ThreadPool* thread_pool);
89 
90   /**
91    * Creates a container.
92    *
93    * @param container The name of the container to be created.
94    * @return Status
95    */
96   Status create_container(const URI& container) const;
97 
98   /** Removes the contents of an Azure container. */
99   Status empty_container(const URI& container) const;
100 
101   /**
102    * Flushes an blob to Azure, finalizing the upload.
103    *
104    * @param uri The URI of the blob to be flushed.
105    * @return Status
106    */
107   Status flush_blob(const URI& uri);
108 
109   /**
110    * Check if a container is empty.
111    *
112    * @param container The name of the container.
113    * @param is_empty Mutates to `true` if the container is empty.
114    * @return Status
115    */
116   Status is_empty_container(const URI& uri, bool* is_empty) const;
117 
118   /**
119    * Check if a container exists.
120    *
121    * @param container The name of the container.
122    * @param is_container Mutates to `true` if `uri` is a container.
123    * @return Status
124    */
125   Status is_container(const URI& uri, bool* is_container) const;
126 
127   /**
128    * Checks if there is an object with prefix `uri/`. For instance, suppose
129    * the following objects exist:
130    *
131    * `azure://some_container/foo/bar1`
132    * `azure://some_container/foo2`
133    *
134    * `is_dir(`azure://some_container/foo`) and
135    * `is_dir(`azure://some_container/foo`) will both return `true`, whereas
136    * `is_dir(`azure://some_container/foo2`) will return `false`. This is because
137    * the function will first convert the input to `azure://some_container/foo2/`
138    * (appending `/` in the end) and then check if there exists any object with
139    * prefix `azure://some_container/foo2/` (in this case there is not).
140    *
141    * @param uri The URI to check.
142    * @param exists Sets it to `true` if the above mentioned condition holds.
143    * @return Status
144    */
145   Status is_dir(const URI& uri, bool* exists) const;
146 
147   /**
148    * Checks if the given URI is an existing Azure blob.
149    *
150    * @param uri The URI of the object to be checked.
151    * @param is_blob Mutates to `true` if `uri` is an existing blob, and `false`
152    * otherwise.
153    */
154   Status is_blob(const URI& uri, bool* is_blob) const;
155 
156   /**
157    * Lists the objects that start with `uri`. Full URI paths are
158    * retrieved for the matched objects. If a delimiter is specified,
159    * the URI paths will be truncated to the first delimiter character.
160    * For instance, if there is a hierarchy:
161    *
162    * - `foo/bar/baz`
163    * - `foo/bar/bash`
164    * - `foo/bar/bang`
165    * - `foo/boo`
166    *
167    * and the delimiter is `/`, the returned URIs will be
168    *
169    * - `foo/boo`
170    * - `foo/bar`
171    *
172    * @param uri The prefix URI.
173    * @param paths Pointer of a vector of URIs to store the retrieved paths.
174    * @param delimiter The delimiter that will
175    * @param max_paths The maximum number of paths to be retrieved. The default
176    *     `-1` indicates that no upper bound is specified.
177    * @return Status
178    */
179   Status ls(
180       const URI& uri,
181       std::vector<std::string>* paths,
182       const std::string& delimiter = "/",
183       int max_paths = -1) const;
184 
185   /**
186    * Renames an object.
187    *
188    * @param old_uri The URI of the old path.
189    * @param new_uri The URI of the new path.
190    * @return Status
191    */
192   Status move_object(const URI& old_uri, const URI& new_uri);
193 
194   /**
195    * Renames a directory. Note that this is an expensive operation.
196    * The function will essentially copy all objects with directory
197    * prefix `old_uri` to new objects with prefix `new_uri` and then
198    * delete the old ones.
199    *
200    * @param old_uri The URI of the old path.
201    * @param new_uri The URI of the new path.
202    * @return Status
203    */
204   Status move_dir(const URI& old_uri, const URI& new_uri);
205 
206   /**
207    * Returns the size of the input blob with a given URI in bytes.
208    *
209    * @param uri The URI of the blob.
210    * @param nbytes Pointer to `uint64_t` bytes to return.
211    * @return Status
212    */
213   Status blob_size(const URI& uri, uint64_t* nbytes) const;
214 
215   /**
216    * Reads data from an object into a buffer.
217    *
218    * @param uri The URI of the object to be read.
219    * @param offset The offset in the object from which the read will start.
220    * @param buffer The buffer into which the data will be written.
221    * @param length The size of the data to be read from the object.
222    * @param read_ahead_length The additional length to read ahead.
223    * @param length_returned Returns the total length read into `buffer`.
224    * @return Status
225    */
226   Status read(
227       const URI& uri,
228       off_t offset,
229       void* buffer,
230       uint64_t length,
231       uint64_t read_ahead_length,
232       uint64_t* length_returned) const;
233 
234   /**
235    * Deletes a container.
236    *
237    * @param uri The URI of the container to be deleted.
238    * @return Status
239    */
240   Status remove_container(const URI& uri) const;
241 
242   /**
243    * Deletes an blob with a given URI.
244    *
245    * @param uri The URI of the blob to be deleted.
246    * @return Status
247    */
248   Status remove_blob(const URI& uri) const;
249 
250   /**
251    * Deletes all objects with prefix `uri/` (if the ending `/` does not
252    * exist in `uri`, it is added by the function.
253    *
254    * For instance, suppose there exist the following objects:
255    * - `azure://some_container/foo/bar1`
256    * - `azure://some_container/foo/bar2/bar3
257    * - `azure://some_container/foo/bar4
258    * - `azure://some_container/foo2`
259    *
260    * `remove("azure://some_container/foo")` and
261    * `remove("azure://some_container/foo/")` will delete objects:
262    *
263    * - `azure://some_container/foo/bar1`
264    * - `azure://some_container/foo/bar2/bar3
265    * - `azure://some_container/foo/bar4
266    *
267    * In contrast, `remove("azure://some_container/foo2")` will not delete
268    * anything; the function internally appends `/` to the end of the URI, and
269    * therefore there is not object with prefix "azure://some_container/foo2/" in
270    * this example.
271    *
272    * @param uri The prefix uri of the objects to be deleted.
273    * @return Status
274    */
275   Status remove_dir(const URI& uri) const;
276 
277   /**
278    * Creates an empty blob.
279    *
280    * @param uri The URI of the blob to be created.
281    * @return Status
282    */
283   Status touch(const URI& uri) const;
284 
285   /**
286    * Writes the input buffer to an Azure object. Note that this is essentially
287    * an append operation implemented via multipart uploads.
288    *
289    * @param uri The URI of the object to be written to.
290    * @param buffer The input buffer.
291    * @param length The size of the input buffer.
292    * @return Status
293    */
294   Status write(const URI& uri, const void* buffer, uint64_t length);
295 
296  private:
297   /* ********************************* */
298   /*         PRIVATE DATATYPES         */
299   /* ********************************* */
300 
301   class AzureRetryPolicy final : public azure::storage_lite::retry_policy_base {
302    public:
303     /**
304      * The SDK invokes this routine before each request to Azure. This returns
305      * a pair: a bool to indicate if we should make a request and a time
306      * interval to wait before starting the request.
307      */
evaluate(const azure::storage_lite::retry_context & context)308     azure::storage_lite::retry_info evaluate(
309         const azure::storage_lite::retry_context& context) const override {
310       const int http_code = context.result();
311 
312       // When the response code is 0, the SDK has yet to send the initial
313       // request. Allow the request to start immediately by returning a 0-second
314       // delay.
315       if (http_code == 0) {
316         return azure::storage_lite::retry_info(true, std::chrono::seconds(0));
317       }
318 
319       // Determine if we should retry on the returned http code in the response.
320       if (!should_retry(http_code)) {
321         return azure::storage_lite::retry_info(false, std::chrono::seconds(0));
322       }
323 
324       // Wait one second before all retry attempts.
325       static const int32_t max_retries = constants::azure_max_attempts;
326       if (context.numbers() < max_retries) {
327         return azure::storage_lite::retry_info(
328             true,
329             std::chrono::seconds(constants::azure_attempt_sleep_ms / 1000));
330       }
331 
332       // All retry attempts exhausted.
333       return azure::storage_lite::retry_info(false, std::chrono::seconds(0));
334     }
335 
336    private:
337     /**
338      * Returns true if we should attempt a retry after receiving 'http_code'
339      * in the last response.
340      */
should_retry(const int http_code)341     bool should_retry(const int http_code) const {
342       // Only retry on server errors.
343       if (http_code >= 500 && http_code < 600) {
344         return true;
345       }
346 
347       return false;
348     }
349   };
350 
351   /** Contains all state associated with a block list upload transaction. */
352   class BlockListUploadState {
353    public:
BlockListUploadState()354     BlockListUploadState()
355         : next_block_id_(0)
356         , st_(Status::Ok()) {
357     }
358 
359     /* Generates the next base64-encoded block id. */
next_block_id()360     std::string next_block_id() {
361       const uint64_t block_id = next_block_id_++;
362       const std::string block_id_str = std::to_string(block_id);
363 
364       // Pad the block id string with enough leading zeros to support
365       // the maximum number of blocks (50,000). All block ids must be
366       // of equal length among a single blob.
367       const int block_id_chars = 5;
368       const std::string padded_block_id_str =
369           std::string(block_id_chars - block_id_str.length(), '0') +
370           block_id_str;
371 
372       const std::string b64_block_id_str = azure::storage_lite::to_base64(
373           reinterpret_cast<const unsigned char*>(padded_block_id_str.c_str()),
374           padded_block_id_str.size());
375 
376       block_ids_.emplace_back(b64_block_id_str);
377 
378       return b64_block_id_str;
379     }
380 
381     /* Returns all generated block ids. */
get_block_ids()382     std::list<std::string> get_block_ids() const {
383       return block_ids_;
384     }
385 
386     /* Returns the aggregate status. */
st()387     Status st() const {
388       return st_;
389     }
390 
391     /* Updates 'st_' if 'st' is non-OK */
update_st(const Status & st)392     void update_st(const Status& st) {
393       if (!st.ok()) {
394         st_ = st;
395       }
396     }
397 
398    private:
399     // The next block id to generate.
400     uint64_t next_block_id_;
401 
402     // A list of all generated block ids.
403     std::list<std::string> block_ids_;
404 
405     // The aggregate status. If any individual block
406     // upload fails, this will be in a non-OK status.
407     Status st_;
408   };
409 
410   /**
411    * A zero-copy stream buffer used as a work-around for writing
412    * a single buffer to the stream-only SDK interface.
413    */
414   class ZeroCopyStreamBuffer : public std::streambuf {
415    public:
ZeroCopyStreamBuffer(char * const buffer,std::size_t size)416     ZeroCopyStreamBuffer(char* const buffer, std::size_t size) {
417       setg(buffer, buffer, buffer + size);
418     }
419   };
420 
421   /* ********************************* */
422   /*         PRIVATE ATTRIBUTES        */
423   /* ********************************* */
424 
425   /** The VFS thread pool. */
426   ThreadPool* thread_pool_;
427 
428   /** The Azure blob storage client. */
429   tdb_shared_ptr<azure::storage_lite::blob_client> client_;
430 
431   /** Maps a blob URI to an write cache buffer. */
432   std::unordered_map<std::string, Buffer> write_cache_map_;
433 
434   /** Protects 'write_cache_map_'. */
435   std::mutex write_cache_map_lock_;
436 
437   /**  The maximum size of each value-element in 'write_cache_map_'. */
438   uint64_t write_cache_max_size_;
439 
440   /**  The maximum number of parallel requests. */
441   uint64_t max_parallel_ops_;
442 
443   /**  The target block size in a block list upload */
444   uint64_t block_list_block_size_;
445 
446   /** Whether or not to use block list upload. */
447   bool use_block_list_upload_;
448 
449   /** Maps a blob URI to its block list upload state. */
450   std::unordered_map<std::string, BlockListUploadState>
451       block_list_upload_states_;
452 
453   /** Protects 'block_list_upload_states_'. */
454   std::mutex block_list_upload_states_lock_;
455 
456   /* ********************************* */
457   /*          PRIVATE METHODS          */
458   /* ********************************* */
459 
460   /**
461    * Thread-safe fetch of the write cache buffer in `write_cache_map_`.
462    * If a buffer does not exist for `uri`, it will be created.
463    *
464    * @param uri The blob URI.
465    * @return Buffer
466    */
467   Buffer* get_write_cache_buffer(const std::string& uri);
468 
469   /**
470    * Fills the write cache buffer (given as an input `Buffer` object) from
471    * the input binary `buffer`, up until the size of the file buffer becomes
472    * `write_cache_max_size_`. It also retrieves the number of bytes filled.
473    *
474    * @param write_cache_buffer The destination write cache buffer to fill.
475    * @param buffer The source binary buffer to fill the data from.
476    * @param length The length of `buffer`.
477    * @param nbytes_filled The number of bytes filled into `write_cache_buffer`.
478    * @return Status
479    */
480   Status fill_write_cache(
481       Buffer* write_cache_buffer,
482       const void* buffer,
483       const uint64_t length,
484       uint64_t* nbytes_filled);
485 
486   /**
487    * Writes the contents of the input buffer to the blob given by
488    * the input `uri` as a new series of block uploads. Resets
489    * 'write_cache_buffer'.
490    *
491    * @param uri The blob URI.
492    * @param write_cache_buffer The input buffer to flush.
493    * @param last_block Should be true only when the flush corresponds to the
494    * last block(s) of a block list upload.
495    * @return Status
496    */
497   Status flush_write_cache(
498       const URI& uri, Buffer* write_cache_buffer, bool last_block);
499 
500   /**
501    * Writes the input buffer as an uncommited block to Azure by issuing one
502    * or more block upload requests.
503    *
504    * @param uri The blob URI.
505    * @param buffer The input buffer.
506    * @param length The size of the input buffer.
507    * @param last_part Should be true only when this is the last block of a blob.
508    * @return Status
509    */
510   Status write_blocks(
511       const URI& uri, const void* buffer, uint64_t length, bool last_block);
512 
513   /**
514    * Executes and waits for a single, uncommited block upload.
515    *
516    * @param container_name The blob's container name.
517    * @param blob_path The blob's file path relative to the container.
518    * @param length The length of `buffer`.
519    * @param block_id A base64-encoded string that is unique to this block
520    * within the blob.
521    * @param result The returned future to fetch the async upload result from.
522    * @return Status
523    */
524   Status upload_block(
525       const std::string& container_name,
526       const std::string& blob_path,
527       const void* const buffer,
528       const uint64_t length,
529       const std::string& block_id);
530 
531   /**
532    * Clears all instance state related to a block list upload on 'uri'.
533    */
534   void finish_block_list_upload(const URI& uri);
535 
536   /**
537    * Uploads the write cache buffer associated with 'uri' as an entire
538    * blob.
539    */
540   Status flush_blob_direct(const URI& uri);
541 
542   /**
543    * Parses a URI into a container name and blob path. For example,
544    * URI "azure://my-container/dir1/file1" will parse into
545    * `*container_name == "my-container"` and `*blob_path == "dir1/file1"`.
546    *
547    * @param uri The URI to parse.
548    * @param container_name Mutates to the container name.
549    * @param blob_path Mutates to the blob path.
550    * @return Status
551    */
552   Status parse_azure_uri(
553       const URI& uri,
554       std::string* container_name,
555       std::string* blob_path) const;
556 
557   /**
558    * Copies the blob at 'old_uri' to `new_uri`.
559    *
560    * @param old_uri The blob's current URI.
561    * @param new_uri The blob's URI to move to.
562    * @return Status
563    */
564   Status copy_blob(const URI& old_uri, const URI& new_uri);
565 
566   /**
567    * Waits for a blob with `container_name` and `blob_path`
568    * to exist on Azure.
569    *
570    * @param container_name The blob's container name.
571    * @param blob_path The blob's path
572    * @return Status
573    */
574   Status wait_for_blob_to_propagate(
575       const std::string& container_name, const std::string& blob_path) const;
576 
577   /**
578    * Waits for a blob with `container_name` and `blob_path`
579    * to not exist on Azure.
580    *
581    * @param container_name The blob's container name.
582    * @param blob_path The blob's path
583    * @return Status
584    */
585   Status wait_for_blob_to_be_deleted(
586       const std::string& container_name, const std::string& blob_path) const;
587 
588   /**
589    * Waits for a container with `container_name`
590    * to exist on Azure.
591    *
592    * @param container_name The container's name.
593    * @return Status
594    */
595   Status wait_for_container_to_propagate(
596       const std::string& container_name) const;
597 
598   /**
599    * Waits for a container with `container_name`
600    * to not exist on Azure.
601    *
602    * @param container_name The container's name.
603    * @return Status
604    */
605   Status wait_for_container_to_be_deleted(
606       const std::string& container_name) const;
607 
608   /**
609    * Check if 'container_name' is a container on Azure.
610    *
611    * @param container_name The container's name.
612    * @param is_container Mutates to the output.
613    * @return Status
614    */
615   Status is_container(
616       const std::string& container_name, bool* const is_container) const;
617 
618   /**
619    * Check if 'is_blob' is a blob on Azure.
620    *
621    * @param container_name The blob's container name.
622    * @param blob_path The blob's path.
623    * @param is_blob Mutates to the output.
624    * @return Status
625    */
626   Status is_blob(
627       const std::string& container_name,
628       const std::string& blob_path,
629       bool* const is_blob) const;
630 
631   /**
632    * Removes a leading slash from 'path' if it exists.
633    *
634    * @param path the string to remove the leading slash from.
635    */
636   std::string remove_front_slash(const std::string& path) const;
637 
638   /**
639    * Adds a trailing slash from 'path' if it doesn't already have one.
640    *
641    * @param path the string to add the trailing slash to.
642    */
643   std::string add_trailing_slash(const std::string& path) const;
644 
645   /**
646    * Removes a trailing slash from 'path' if it exists.
647    *
648    * @param path the string to remove the trailing slash from.
649    */
650   std::string remove_trailing_slash(const std::string& path) const;
651 };
652 
653 }  // namespace sm
654 }  // namespace tiledb
655 
656 #endif  // HAVE_AZURE
657 #endif  // TILEDB_AZURE_H
658