1# -*- coding: utf-8 -*- 2# © Copyright EnterpriseDB UK Limited 2018-2021 3# 4# This file is part of Barman. 5# 6# Barman is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# Barman is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Barman. If not, see <http://www.gnu.org/licenses/> 18 19import bz2 20import gzip 21import logging 22import os 23import shutil 24from io import BytesIO, RawIOBase 25 26from barman.cloud import CloudInterface, CloudProviderError 27 28try: 29 # Python 3.x 30 from urllib.parse import urlparse 31except ImportError: 32 # Python 2.x 33 from urlparse import urlparse 34 35try: 36 from azure.storage.blob import ( 37 BlobPrefix, 38 ContainerClient, 39 PartialBatchErrorException, 40 ) 41 from azure.core.exceptions import ( 42 HttpResponseError, 43 ResourceNotFoundError, 44 ServiceRequestError, 45 ) 46except ImportError: 47 raise SystemExit("Missing required python module: azure-storage-blob") 48 49# Domain for azure blob URIs 50# See https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata#resource-uri-syntax 51AZURE_BLOB_STORAGE_DOMAIN = "blob.core.windows.net" 52 53 54class StreamingBlobIO(RawIOBase): 55 """ 56 Wrap an azure-storage-blob StorageStreamDownloader in the IOBase API. 57 58 Inherits the IOBase defaults of seekable() -> False and writable() -> False. 59 """ 60 61 def __init__(self, blob): 62 self._chunks = blob.chunks() 63 self._current_chunk = BytesIO() 64 65 def readable(self): 66 return True 67 68 def read(self, n=1): 69 """ 70 Read at most n bytes from the stream. 71 72 Fetches new chunks from the StorageStreamDownloader until the requested 73 number of bytes have been read. 74 75 :param int n: Number of bytes to read from the stream 76 :return: Up to n bytes from the stream 77 :rtype: bytes 78 """ 79 n = None if n < 0 else n 80 blob_bytes = self._current_chunk.read(n) 81 bytes_count = len(blob_bytes) 82 try: 83 while bytes_count < n: 84 self._current_chunk = BytesIO(self._chunks.next()) 85 new_blob_bytes = self._current_chunk.read(n - bytes_count) 86 bytes_count += len(new_blob_bytes) 87 blob_bytes += new_blob_bytes 88 except StopIteration: 89 pass 90 return blob_bytes 91 92 93class AzureCloudInterface(CloudInterface): 94 # Azure block blob limitations 95 # https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs 96 MAX_CHUNKS_PER_FILE = 50000 97 # Minimum block size allowed in Azure Blob Storage is 64KB 98 MIN_CHUNK_SIZE = 64 << 10 99 100 # Azure Blob Storage permit a maximum of 4.75TB per file 101 # This is a hard limit, while our upload procedure can go over the specified 102 # MAX_ARCHIVE_SIZE - so we set a maximum of 1TB per file 103 MAX_ARCHIVE_SIZE = 1 << 40 104 105 def __init__(self, url, jobs=2, encryption_scope=None): 106 """ 107 Create a new Azure Blob Storage interface given the supplied acccount url 108 109 :param str url: Full URL of the cloud destination/source 110 :param int jobs: How many sub-processes to use for asynchronous 111 uploading, defaults to 2. 112 """ 113 super(AzureCloudInterface, self).__init__( 114 url=url, 115 jobs=jobs, 116 ) 117 self.encryption_scope = encryption_scope 118 119 parsed_url = urlparse(url) 120 if parsed_url.netloc.endswith(AZURE_BLOB_STORAGE_DOMAIN): 121 # We have an Azure Storage URI so we use the following form: 122 # <http|https>://<account-name>.<service-name>.core.windows.net/<resource-path> 123 # where <resource-path> is <container>/<blob>. 124 # Note that although Azure supports an implicit root container, we require 125 # that the container is always included. 126 self.account_url = parsed_url.netloc 127 try: 128 self.bucket_name = parsed_url.path.split("/")[1] 129 except IndexError: 130 raise ValueError("azure blob storage URL %s is malformed" % url) 131 path = parsed_url.path.split("/")[2:] 132 else: 133 # We are dealing with emulated storage so we use the following form: 134 # http://<local-machine-address>:<port>/<account-name>/<resource-path> 135 logging.info("Using emulated storage URL: %s " % url) 136 if "AZURE_STORAGE_CONNECTION_STRING" not in os.environ: 137 raise ValueError( 138 "A connection string must be provided when using emulated storage" 139 ) 140 try: 141 self.bucket_name = parsed_url.path.split("/")[2] 142 except IndexError: 143 raise ValueError("emulated storage URL %s is malformed" % url) 144 path = parsed_url.path.split("/")[3:] 145 146 self.path = "/".join(path) 147 148 self.bucket_exists = None 149 self._reinit_session() 150 151 def _reinit_session(self): 152 """ 153 Create a new session 154 """ 155 if "AZURE_STORAGE_CONNECTION_STRING" in os.environ: 156 logging.info("Authenticating to Azure with connection string") 157 self.container_client = ContainerClient.from_connection_string( 158 conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING"), 159 container_name=self.bucket_name, 160 ) 161 else: 162 if "AZURE_STORAGE_SAS_TOKEN" in os.environ: 163 logging.info("Authenticating to Azure with SAS token") 164 credential = os.getenv("AZURE_STORAGE_SAS_TOKEN") 165 elif "AZURE_STORAGE_KEY" in os.environ: 166 logging.info("Authenticating to Azure with shared key") 167 credential = os.getenv("AZURE_STORAGE_KEY") 168 else: 169 logging.info("Authenticating to Azure with default credentials") 170 # azure-identity is not part of azure-storage-blob so only import 171 # it if needed 172 try: 173 from azure.identity import DefaultAzureCredential 174 except ImportError: 175 raise SystemExit("Missing required python module: azure-identity") 176 credential = DefaultAzureCredential() 177 self.container_client = ContainerClient( 178 account_url=self.account_url, 179 container_name=self.bucket_name, 180 credential=credential, 181 ) 182 183 @property 184 def _extra_upload_args(self): 185 optional_args = {} 186 if self.encryption_scope: 187 optional_args["encryption_scope"] = self.encryption_scope 188 return optional_args 189 190 def test_connectivity(self): 191 """ 192 Test Azure connectivity by trying to access a container 193 """ 194 try: 195 # We are not even interested in the existence of the bucket, 196 # we just want to see if Azure blob service is reachable. 197 self.bucket_exists = self._check_bucket_existence() 198 return True 199 except (HttpResponseError, ServiceRequestError) as exc: 200 logging.error("Can't connect to cloud provider: %s", exc) 201 return False 202 203 def _check_bucket_existence(self): 204 """ 205 Chck Azure Blob Storage for the target container 206 207 Although there is an `exists` function it cannot be called by container-level 208 shared access tokens. We therefore check for existence by calling list_blobs 209 on the container. 210 211 :return: True if the container exists, False otherwise 212 :rtype: bool 213 """ 214 try: 215 self.container_client.list_blobs().next() 216 except ResourceNotFoundError: 217 return False 218 except StopIteration: 219 # The bucket is empty but it does exist 220 pass 221 return True 222 223 def _create_bucket(self): 224 """ 225 Create the container in cloud storage 226 """ 227 # By default public access is disabled for newly created containers. 228 # Unlike S3 there is no concept of regions for containers (this is at 229 # the storage account level in Azure) 230 self.container_client.create_container() 231 232 def _walk_blob_tree(self, obj, ignore=None): 233 """ 234 Walk a blob tree in a directory manner and return a list of directories 235 and files. 236 237 :param ItemPaged[BlobProperties] obj: Iterable response of BlobProperties 238 obtained from ContainerClient.walk_blobs 239 :param str|None ignore: An entry to be excluded from the returned list, 240 typically the top level prefix 241 :return: List of objects and directories in the tree 242 :rtype: List[str] 243 """ 244 if obj.name != ignore: 245 yield obj.name 246 if isinstance(obj, BlobPrefix): 247 # We are a prefix and not a leaf so iterate children 248 for child in obj: 249 for v in self._walk_blob_tree(child): 250 yield v 251 252 def list_bucket(self, prefix="", delimiter="/"): 253 """ 254 List bucket content in a directory manner 255 256 :param str prefix: 257 :param str delimiter: 258 :return: List of objects and dirs right under the prefix 259 :rtype: List[str] 260 """ 261 res = self.container_client.walk_blobs( 262 name_starts_with=prefix, delimiter=delimiter 263 ) 264 return self._walk_blob_tree(res, ignore=prefix) 265 266 def download_file(self, key, dest_path, decompress=None): 267 """ 268 Download a file from Azure Blob Storage 269 270 :param str key: The key to download 271 :param str dest_path: Where to put the destination file 272 :param str|None decompress: Compression scheme to use for decompression 273 """ 274 obj = self.container_client.download_blob(key) 275 with open(dest_path, "wb") as dest_file: 276 if not decompress: 277 obj.download_to_stream(dest_file) 278 return 279 blob = StreamingBlobIO(obj) 280 if decompress == "gzip": 281 source_file = gzip.GzipFile(fileobj=blob, mode="rb") 282 elif decompress == "bzip2": 283 source_file = bz2.BZ2File(blob, "rb") 284 with source_file: 285 shutil.copyfileobj(source_file, dest_file) 286 287 def remote_open(self, key): 288 """ 289 Open a remote Azure Blob Storage object and return a readable stream 290 291 :param str key: The key identifying the object to open 292 :return: A file-like object from which the stream can be read or None if 293 the key does not exist 294 """ 295 try: 296 obj = self.container_client.download_blob(key) 297 return StreamingBlobIO(obj) 298 except ResourceNotFoundError: 299 return None 300 301 def upload_fileobj(self, fileobj, key): 302 """ 303 Synchronously upload the content of a file-like object to a cloud key 304 305 :param fileobj IOBase: File-like object to upload 306 :param str key: The key to identify the uploaded object 307 """ 308 self.container_client.upload_blob( 309 name=key, data=fileobj, overwrite=True, **self._extra_upload_args 310 ) 311 312 def create_multipart_upload(self, key): 313 """No-op method because Azure has no concept of multipart uploads 314 315 Instead of multipart upload, blob blocks are staged and then committed. 316 However this does not require anything to be created up front. 317 This method therefore does nothing. 318 """ 319 pass 320 321 def _upload_part(self, upload_metadata, key, body, part_number): 322 """ 323 Upload a single block of this block blob. 324 325 Uses the supplied part number to generate the block ID and returns it 326 as the "PartNumber" in the part metadata. 327 328 :param dict upload_metadata: Provider-specific metadata about the upload 329 (not used in Azure) 330 :param str key: The key to use in the cloud service 331 :param object body: A stream-like object to upload 332 :param int part_number: Part number, starting from 1 333 :return: The part metadata 334 :rtype: dict[str, None|str] 335 """ 336 # Block IDs must be the same length for all bocks in the blob 337 # and no greater than 64 characters. Given there is a limit of 338 # 50000 blocks per blob we zero-pad the part_number to five 339 # places. 340 block_id = str(part_number).zfill(5) 341 blob_client = self.container_client.get_blob_client(key) 342 blob_client.stage_block(block_id, body, **self._extra_upload_args) 343 return {"PartNumber": block_id} 344 345 def _complete_multipart_upload(self, upload_metadata, key, parts): 346 """ 347 Finish a "multipart upload" by committing all blocks in the blob. 348 349 :param dict upload_metadata: Provider-specific metadata about the upload 350 (not used in Azure) 351 :param str key: The key to use in the cloud service 352 :param parts: The list of block IDs for the blocks which compose this blob 353 """ 354 blob_client = self.container_client.get_blob_client(key) 355 block_list = [part["PartNumber"] for part in parts] 356 blob_client.commit_block_list(block_list, **self._extra_upload_args) 357 358 def _abort_multipart_upload(self, upload_metadata, key): 359 """ 360 Abort the upload of a block blob 361 362 The objective of this method is to clean up any dangling resources - in 363 this case those resources are uncommitted blocks. 364 365 :param dict upload_metadata: Provider-specific metadata about the upload 366 (not used in Azure) 367 :param str key: The key to use in the cloud service 368 """ 369 # Ideally we would clean up uncommitted blocks at this point 370 # however there is no way of doing that. 371 # Uncommitted blocks will be discarded after 7 days or when 372 # the blob is committed (if they're not included in the commit). 373 # We therefore create an empty blob (thereby discarding all uploaded 374 # blocks for that blob) and then delete it. 375 blob_client = self.container_client.get_blob_client(key) 376 blob_client.commit_block_list([], **self._extra_upload_args) 377 blob_client.delete_blob() 378 379 def delete_objects(self, paths): 380 """ 381 Delete the objects at the specified paths 382 383 :param List[str] paths: 384 """ 385 try: 386 # If paths is empty because the files have already been deleted then 387 # delete_blobs will return successfully so we just call it with whatever 388 # we were given 389 responses = self.container_client.delete_blobs(*paths) 390 except PartialBatchErrorException as exc: 391 # Although the docs imply any errors will be returned in the response 392 # object, in practice a PartialBatchErrorException is raised which contains 393 # the response objects in its `parts` attribute. 394 # We therefore set responses to reference the response in the exception and 395 # treat it the same way we would a regular response. 396 logging.warning( 397 "PartialBatchErrorException received from Azure: %s" % exc.message 398 ) 399 responses = exc.parts 400 401 # resp is an iterator of HttpResponse objects so we check the status codes 402 # which should all be 202 if successful 403 errors = False 404 for resp in responses: 405 if resp.status_code == 404: 406 logging.warning( 407 "Deletion of object %s failed because it could not be found" 408 % resp.request.url 409 ) 410 elif resp.status_code != 202: 411 errors = True 412 logging.error( 413 'Deletion of object %s failed with error code: "%s"' 414 % (resp.request.url, resp.status_code) 415 ) 416 417 if errors: 418 raise CloudProviderError( 419 "Error from cloud provider while deleting objects - " 420 "please check the Barman logs" 421 ) 422