1# -*- coding: utf-8 -*-
2# © Copyright EnterpriseDB UK Limited 2018-2021
3#
4# This file is part of Barman.
5#
6# Barman is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# Barman is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Barman.  If not, see <http://www.gnu.org/licenses/>
18
19import bz2
20import gzip
21import logging
22import os
23import shutil
24from io import BytesIO, RawIOBase
25
26from barman.cloud import CloudInterface, CloudProviderError
27
28try:
29    # Python 3.x
30    from urllib.parse import urlparse
31except ImportError:
32    # Python 2.x
33    from urlparse import urlparse
34
35try:
36    from azure.storage.blob import (
37        BlobPrefix,
38        ContainerClient,
39        PartialBatchErrorException,
40    )
41    from azure.core.exceptions import (
42        HttpResponseError,
43        ResourceNotFoundError,
44        ServiceRequestError,
45    )
46except ImportError:
47    raise SystemExit("Missing required python module: azure-storage-blob")
48
49# Domain for azure blob URIs
50# See https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#resource-uri-syntax
51AZURE_BLOB_STORAGE_DOMAIN = "blob.core.windows.net"
52
53
54class StreamingBlobIO(RawIOBase):
55    """
56    Wrap an azure-storage-blob StorageStreamDownloader in the IOBase API.
57
58    Inherits the IOBase defaults of seekable() -> False and writable() -> False.
59    """
60
61    def __init__(self, blob):
62        self._chunks = blob.chunks()
63        self._current_chunk = BytesIO()
64
65    def readable(self):
66        return True
67
68    def read(self, n=1):
69        """
70        Read at most n bytes from the stream.
71
72        Fetches new chunks from the StorageStreamDownloader until the requested
73        number of bytes have been read.
74
75        :param int n: Number of bytes to read from the stream
76        :return: Up to n bytes from the stream
77        :rtype: bytes
78        """
79        n = None if n < 0 else n
80        blob_bytes = self._current_chunk.read(n)
81        bytes_count = len(blob_bytes)
82        try:
83            while bytes_count < n:
84                self._current_chunk = BytesIO(self._chunks.next())
85                new_blob_bytes = self._current_chunk.read(n - bytes_count)
86                bytes_count += len(new_blob_bytes)
87                blob_bytes += new_blob_bytes
88        except StopIteration:
89            pass
90        return blob_bytes
91
92
93class AzureCloudInterface(CloudInterface):
94    # Azure block blob limitations
95    # https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs
96    MAX_CHUNKS_PER_FILE = 50000
97    # Minimum block size allowed in Azure Blob Storage is 64KB
98    MIN_CHUNK_SIZE = 64 << 10
99
100    # Azure Blob Storage permit a maximum of 4.75TB per file
101    # This is a hard limit, while our upload procedure can go over the specified
102    # MAX_ARCHIVE_SIZE - so we set a maximum of 1TB per file
103    MAX_ARCHIVE_SIZE = 1 << 40
104
105    def __init__(self, url, jobs=2, encryption_scope=None):
106        """
107        Create a new Azure Blob Storage interface given the supplied acccount url
108
109        :param str url: Full URL of the cloud destination/source
110        :param int jobs: How many sub-processes to use for asynchronous
111          uploading, defaults to 2.
112        """
113        super(AzureCloudInterface, self).__init__(
114            url=url,
115            jobs=jobs,
116        )
117        self.encryption_scope = encryption_scope
118
119        parsed_url = urlparse(url)
120        if parsed_url.netloc.endswith(AZURE_BLOB_STORAGE_DOMAIN):
121            # We have an Azure Storage URI so we use the following form:
122            # <http|https>://<account-name>.<service-name>.core.windows.net/<resource-path>
123            # where <resource-path> is <container>/<blob>.
124            # Note that although Azure supports an implicit root container, we require
125            # that the container is always included.
126            self.account_url = parsed_url.netloc
127            try:
128                self.bucket_name = parsed_url.path.split("/")[1]
129            except IndexError:
130                raise ValueError("azure blob storage URL %s is malformed" % url)
131            path = parsed_url.path.split("/")[2:]
132        else:
133            # We are dealing with emulated storage so we use the following form:
134            # http://<local-machine-address>:<port>/<account-name>/<resource-path>
135            logging.info("Using emulated storage URL: %s " % url)
136            if "AZURE_STORAGE_CONNECTION_STRING" not in os.environ:
137                raise ValueError(
138                    "A connection string must be provided when using emulated storage"
139                )
140            try:
141                self.bucket_name = parsed_url.path.split("/")[2]
142            except IndexError:
143                raise ValueError("emulated storage URL %s is malformed" % url)
144            path = parsed_url.path.split("/")[3:]
145
146        self.path = "/".join(path)
147
148        self.bucket_exists = None
149        self._reinit_session()
150
151    def _reinit_session(self):
152        """
153        Create a new session
154        """
155        if "AZURE_STORAGE_CONNECTION_STRING" in os.environ:
156            logging.info("Authenticating to Azure with connection string")
157            self.container_client = ContainerClient.from_connection_string(
158                conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING"),
159                container_name=self.bucket_name,
160            )
161        else:
162            if "AZURE_STORAGE_SAS_TOKEN" in os.environ:
163                logging.info("Authenticating to Azure with SAS token")
164                credential = os.getenv("AZURE_STORAGE_SAS_TOKEN")
165            elif "AZURE_STORAGE_KEY" in os.environ:
166                logging.info("Authenticating to Azure with shared key")
167                credential = os.getenv("AZURE_STORAGE_KEY")
168            else:
169                logging.info("Authenticating to Azure with default credentials")
170                # azure-identity is not part of azure-storage-blob so only import
171                # it if needed
172                try:
173                    from azure.identity import DefaultAzureCredential
174                except ImportError:
175                    raise SystemExit("Missing required python module: azure-identity")
176                credential = DefaultAzureCredential()
177            self.container_client = ContainerClient(
178                account_url=self.account_url,
179                container_name=self.bucket_name,
180                credential=credential,
181            )
182
183    @property
184    def _extra_upload_args(self):
185        optional_args = {}
186        if self.encryption_scope:
187            optional_args["encryption_scope"] = self.encryption_scope
188        return optional_args
189
190    def test_connectivity(self):
191        """
192        Test Azure connectivity by trying to access a container
193        """
194        try:
195            # We are not even interested in the existence of the bucket,
196            # we just want to see if Azure blob service is reachable.
197            self.bucket_exists = self._check_bucket_existence()
198            return True
199        except (HttpResponseError, ServiceRequestError) as exc:
200            logging.error("Can't connect to cloud provider: %s", exc)
201            return False
202
203    def _check_bucket_existence(self):
204        """
205        Chck Azure Blob Storage for the target container
206
207        Although there is an `exists` function it cannot be called by container-level
208        shared access tokens. We therefore check for existence by calling list_blobs
209        on the container.
210
211        :return: True if the container exists, False otherwise
212        :rtype: bool
213        """
214        try:
215            self.container_client.list_blobs().next()
216        except ResourceNotFoundError:
217            return False
218        except StopIteration:
219            # The bucket is empty but it does exist
220            pass
221        return True
222
223    def _create_bucket(self):
224        """
225        Create the container in cloud storage
226        """
227        # By default public access is disabled for newly created containers.
228        # Unlike S3 there is no concept of regions for containers (this is at
229        # the storage account level in Azure)
230        self.container_client.create_container()
231
232    def _walk_blob_tree(self, obj, ignore=None):
233        """
234        Walk a blob tree in a directory manner and return a list of directories
235        and files.
236
237        :param ItemPaged[BlobProperties] obj: Iterable response of BlobProperties
238          obtained from ContainerClient.walk_blobs
239        :param str|None ignore: An entry to be excluded from the returned list,
240          typically the top level prefix
241        :return: List of objects and directories in the tree
242        :rtype: List[str]
243        """
244        if obj.name != ignore:
245            yield obj.name
246        if isinstance(obj, BlobPrefix):
247            # We are a prefix and not a leaf so iterate children
248            for child in obj:
249                for v in self._walk_blob_tree(child):
250                    yield v
251
252    def list_bucket(self, prefix="", delimiter="/"):
253        """
254        List bucket content in a directory manner
255
256        :param str prefix:
257        :param str delimiter:
258        :return: List of objects and dirs right under the prefix
259        :rtype: List[str]
260        """
261        res = self.container_client.walk_blobs(
262            name_starts_with=prefix, delimiter=delimiter
263        )
264        return self._walk_blob_tree(res, ignore=prefix)
265
266    def download_file(self, key, dest_path, decompress=None):
267        """
268        Download a file from Azure Blob Storage
269
270        :param str key: The key to download
271        :param str dest_path: Where to put the destination file
272        :param str|None decompress: Compression scheme to use for decompression
273        """
274        obj = self.container_client.download_blob(key)
275        with open(dest_path, "wb") as dest_file:
276            if not decompress:
277                obj.download_to_stream(dest_file)
278                return
279            blob = StreamingBlobIO(obj)
280            if decompress == "gzip":
281                source_file = gzip.GzipFile(fileobj=blob, mode="rb")
282            elif decompress == "bzip2":
283                source_file = bz2.BZ2File(blob, "rb")
284            with source_file:
285                shutil.copyfileobj(source_file, dest_file)
286
287    def remote_open(self, key):
288        """
289        Open a remote Azure Blob Storage object and return a readable stream
290
291        :param str key: The key identifying the object to open
292        :return: A file-like object from which the stream can be read or None if
293          the key does not exist
294        """
295        try:
296            obj = self.container_client.download_blob(key)
297            return StreamingBlobIO(obj)
298        except ResourceNotFoundError:
299            return None
300
301    def upload_fileobj(self, fileobj, key):
302        """
303        Synchronously upload the content of a file-like object to a cloud key
304
305        :param fileobj IOBase: File-like object to upload
306        :param str key: The key to identify the uploaded object
307        """
308        self.container_client.upload_blob(
309            name=key, data=fileobj, overwrite=True, **self._extra_upload_args
310        )
311
312    def create_multipart_upload(self, key):
313        """No-op method because Azure has no concept of multipart uploads
314
315        Instead of multipart upload, blob blocks are staged and then committed.
316        However this does not require anything to be created up front.
317        This method therefore does nothing.
318        """
319        pass
320
321    def _upload_part(self, upload_metadata, key, body, part_number):
322        """
323        Upload a single block of this block blob.
324
325        Uses the supplied part number to generate the block ID and returns it
326        as the "PartNumber" in the part metadata.
327
328        :param dict upload_metadata: Provider-specific metadata about the upload
329          (not used in Azure)
330        :param str key: The key to use in the cloud service
331        :param object body: A stream-like object to upload
332        :param int part_number: Part number, starting from 1
333        :return: The part metadata
334        :rtype: dict[str, None|str]
335        """
336        # Block IDs must be the same length for all bocks in the blob
337        # and no greater than 64 characters. Given there is a limit of
338        # 50000 blocks per blob we zero-pad the part_number to five
339        # places.
340        block_id = str(part_number).zfill(5)
341        blob_client = self.container_client.get_blob_client(key)
342        blob_client.stage_block(block_id, body, **self._extra_upload_args)
343        return {"PartNumber": block_id}
344
345    def _complete_multipart_upload(self, upload_metadata, key, parts):
346        """
347        Finish a "multipart upload" by committing all blocks in the blob.
348
349        :param dict upload_metadata: Provider-specific metadata about the upload
350          (not used in Azure)
351        :param str key: The key to use in the cloud service
352        :param parts: The list of block IDs for the blocks which compose this blob
353        """
354        blob_client = self.container_client.get_blob_client(key)
355        block_list = [part["PartNumber"] for part in parts]
356        blob_client.commit_block_list(block_list, **self._extra_upload_args)
357
358    def _abort_multipart_upload(self, upload_metadata, key):
359        """
360        Abort the upload of a block blob
361
362        The objective of this method is to clean up any dangling resources - in
363        this case those resources are uncommitted blocks.
364
365        :param dict upload_metadata: Provider-specific metadata about the upload
366          (not used in Azure)
367        :param str key: The key to use in the cloud service
368        """
369        # Ideally we would clean up uncommitted blocks at this point
370        # however there is no way of doing that.
371        # Uncommitted blocks will be discarded after 7 days or when
372        # the blob is committed (if they're not included in the commit).
373        # We therefore create an empty blob (thereby discarding all uploaded
374        # blocks for that blob) and then delete it.
375        blob_client = self.container_client.get_blob_client(key)
376        blob_client.commit_block_list([], **self._extra_upload_args)
377        blob_client.delete_blob()
378
379    def delete_objects(self, paths):
380        """
381        Delete the objects at the specified paths
382
383        :param List[str] paths:
384        """
385        try:
386            # If paths is empty because the files have already been deleted then
387            # delete_blobs will return successfully so we just call it with whatever
388            # we were given
389            responses = self.container_client.delete_blobs(*paths)
390        except PartialBatchErrorException as exc:
391            # Although the docs imply any errors will be returned in the response
392            # object, in practice a PartialBatchErrorException is raised which contains
393            # the response objects in its `parts` attribute.
394            # We therefore set responses to reference the response in the exception and
395            # treat it the same way we would a regular response.
396            logging.warning(
397                "PartialBatchErrorException received from Azure: %s" % exc.message
398            )
399            responses = exc.parts
400
401        # resp is an iterator of HttpResponse objects so we check the status codes
402        # which should all be 202 if successful
403        errors = False
404        for resp in responses:
405            if resp.status_code == 404:
406                logging.warning(
407                    "Deletion of object %s failed because it could not be found"
408                    % resp.request.url
409                )
410            elif resp.status_code != 202:
411                errors = True
412                logging.error(
413                    'Deletion of object %s failed with error code: "%s"'
414                    % (resp.request.url, resp.status_code)
415                )
416
417        if errors:
418            raise CloudProviderError(
419                "Error from cloud provider while deleting objects - "
420                "please check the Barman logs"
421            )
422