1# -*- coding: utf-8 -*- 2# © Copyright EnterpriseDB UK Limited 2018-2021 3# 4# This file is part of Barman. 5# 6# Barman is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# Barman is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Barman. If not, see <http://www.gnu.org/licenses/> 18 19import bz2 20import gzip 21import logging 22import shutil 23from io import RawIOBase 24 25from barman.cloud import CloudInterface, CloudProviderError 26 27try: 28 # Python 3.x 29 from urllib.parse import urlparse 30except ImportError: 31 # Python 2.x 32 from urlparse import urlparse 33 34try: 35 import boto3 36 from botocore.exceptions import ClientError, EndpointConnectionError 37except ImportError: 38 raise SystemExit("Missing required python module: boto3") 39 40 41class StreamingBodyIO(RawIOBase): 42 """ 43 Wrap a boto StreamingBody in the IOBase API. 44 """ 45 46 def __init__(self, body): 47 self.body = body 48 49 def readable(self): 50 return True 51 52 def read(self, n=-1): 53 n = None if n < 0 else n 54 return self.body.read(n) 55 56 57class S3CloudInterface(CloudInterface): 58 # S3 multipart upload limitations 59 # http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html 60 MAX_CHUNKS_PER_FILE = 10000 61 MIN_CHUNK_SIZE = 5 << 20 62 63 # S3 permit a maximum of 5TB per file 64 # https://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html 65 # This is a hard limit, while our upload procedure can go over the specified 66 # MAX_ARCHIVE_SIZE - so we set a maximum of 1TB per file 67 MAX_ARCHIVE_SIZE = 1 << 40 68 69 def __getstate__(self): 70 state = self.__dict__.copy() 71 # Remove boto3 client reference from the state as it cannot be pickled 72 # in Python >= 3.8 and multiprocessing will pickle the object when the 73 # worker processes are created. 74 # The worker processes create their own boto3 sessions so do not need 75 # the boto3 session from the parent process. 76 del state["s3"] 77 return state 78 79 def __setstate__(self, state): 80 self.__dict__.update(state) 81 82 def __init__( 83 self, url, encryption=None, jobs=2, profile_name=None, endpoint_url=None 84 ): 85 """ 86 Create a new S3 interface given the S3 destination url and the profile 87 name 88 89 :param str url: Full URL of the cloud destination/source 90 :param str|None encryption: Encryption type string 91 :param int jobs: How many sub-processes to use for asynchronous 92 uploading, defaults to 2. 93 :param str profile_name: Amazon auth profile identifier 94 :param str endpoint_url: override default endpoint detection strategy 95 with this one 96 """ 97 super(S3CloudInterface, self).__init__( 98 url=url, 99 jobs=jobs, 100 ) 101 self.profile_name = profile_name 102 self.encryption = encryption 103 self.endpoint_url = endpoint_url 104 105 # Extract information from the destination URL 106 parsed_url = urlparse(url) 107 # If netloc is not present, the s3 url is badly formatted. 108 if parsed_url.netloc == "" or parsed_url.scheme != "s3": 109 raise ValueError("Invalid s3 URL address: %s" % url) 110 self.bucket_name = parsed_url.netloc 111 self.bucket_exists = None 112 self.path = parsed_url.path.lstrip("/") 113 114 # Build a session, so we can extract the correct resource 115 self._reinit_session() 116 117 def _reinit_session(self): 118 """ 119 Create a new session 120 """ 121 session = boto3.Session(profile_name=self.profile_name) 122 self.s3 = session.resource("s3", endpoint_url=self.endpoint_url) 123 124 @property 125 def _extra_upload_args(self): 126 """ 127 Return a dict containing ExtraArgs to be passed to certain boto3 calls 128 129 Because some boto3 calls accept `ExtraArgs: {}` and others do not, we 130 return a nexted dict which can be expanded with `**` in the boto3 call. 131 """ 132 additional_args = {} 133 if self.encryption: 134 additional_args["ServerSideEncryption"] = self.encryption 135 return additional_args 136 137 def test_connectivity(self): 138 """ 139 Test AWS connectivity by trying to access a bucket 140 """ 141 try: 142 # We are not even interested in the existence of the bucket, 143 # we just want to try if aws is reachable 144 self.bucket_exists = self._check_bucket_existence() 145 return True 146 except EndpointConnectionError as exc: 147 logging.error("Can't connect to cloud provider: %s", exc) 148 return False 149 150 def _check_bucket_existence(self): 151 """ 152 Check cloud storage for the target bucket 153 154 :return: True if the bucket exists, False otherwise 155 :rtype: bool 156 """ 157 try: 158 # Search the bucket on s3 159 self.s3.meta.client.head_bucket(Bucket=self.bucket_name) 160 return True 161 except ClientError as exc: 162 # If a client error is thrown, then check the error code. 163 # If code was 404, then the bucket does not exist 164 error_code = exc.response["Error"]["Code"] 165 if error_code == "404": 166 return False 167 # Otherwise there is nothing else to do than re-raise the original 168 # exception 169 raise 170 171 def _create_bucket(self): 172 """ 173 Create the bucket in cloud storage 174 """ 175 # Get the current region from client. 176 # Do not use session.region_name here because it may be None 177 region = self.s3.meta.client.meta.region_name 178 logging.info( 179 "Bucket '%s' does not exist, creating it on region '%s'", 180 self.bucket_name, 181 region, 182 ) 183 create_bucket_config = { 184 "ACL": "private", 185 } 186 # The location constraint is required during bucket creation 187 # for all regions outside of us-east-1. This constraint cannot 188 # be specified in us-east-1; specifying it in this region 189 # results in a failure, so we will only 190 # add it if we are deploying outside of us-east-1. 191 # See https://github.com/boto/boto3/issues/125 192 if region != "us-east-1": 193 create_bucket_config["CreateBucketConfiguration"] = { 194 "LocationConstraint": region, 195 } 196 self.s3.Bucket(self.bucket_name).create(**create_bucket_config) 197 198 def list_bucket(self, prefix="", delimiter="/"): 199 """ 200 List bucket content in a directory manner 201 202 :param str prefix: 203 :param str delimiter: 204 :return: List of objects and dirs right under the prefix 205 :rtype: List[str] 206 """ 207 if prefix.startswith(delimiter): 208 prefix = prefix.lstrip(delimiter) 209 210 res = self.s3.meta.client.list_objects_v2( 211 Bucket=self.bucket_name, Prefix=prefix, Delimiter=delimiter 212 ) 213 214 # List "folders" 215 keys = res.get("CommonPrefixes") 216 if keys is not None: 217 for k in keys: 218 yield k.get("Prefix") 219 220 # List "files" 221 objects = res.get("Contents") 222 if objects is not None: 223 for o in objects: 224 yield o.get("Key") 225 226 def download_file(self, key, dest_path, decompress): 227 """ 228 Download a file from S3 229 230 :param str key: The S3 key to download 231 :param str dest_path: Where to put the destination file 232 :param bool decompress: Whenever to decompress this file or not 233 """ 234 # Open the remote file 235 obj = self.s3.Object(self.bucket_name, key) 236 remote_file = obj.get()["Body"] 237 238 # Write the dest file in binary mode 239 with open(dest_path, "wb") as dest_file: 240 # If the file is not compressed, just copy its content 241 if not decompress: 242 shutil.copyfileobj(remote_file, dest_file) 243 return 244 245 if decompress == "gzip": 246 source_file = gzip.GzipFile(fileobj=remote_file, mode="rb") 247 elif decompress == "bzip2": 248 source_file = bz2.BZ2File(remote_file, "rb") 249 else: 250 raise ValueError("Unknown compression type: %s" % decompress) 251 252 with source_file: 253 shutil.copyfileobj(source_file, dest_file) 254 255 def remote_open(self, key): 256 """ 257 Open a remote S3 object and returns a readable stream 258 259 :param str key: The key identifying the object to open 260 :return: A file-like object from which the stream can be read or None if 261 the key does not exist 262 """ 263 try: 264 obj = self.s3.Object(self.bucket_name, key) 265 return StreamingBodyIO(obj.get()["Body"]) 266 except ClientError as exc: 267 error_code = exc.response["Error"]["Code"] 268 if error_code == "NoSuchKey": 269 return None 270 else: 271 raise 272 273 def upload_fileobj(self, fileobj, key): 274 """ 275 Synchronously upload the content of a file-like object to a cloud key 276 277 :param fileobj IOBase: File-like object to upload 278 :param str key: The key to identify the uploaded object 279 """ 280 self.s3.meta.client.upload_fileobj( 281 Fileobj=fileobj, 282 Bucket=self.bucket_name, 283 Key=key, 284 ExtraArgs=self._extra_upload_args, 285 ) 286 287 def create_multipart_upload(self, key): 288 """ 289 Create a new multipart upload 290 291 :param key: The key to use in the cloud service 292 :return: The multipart upload handle 293 :rtype: dict[str, str] 294 """ 295 return self.s3.meta.client.create_multipart_upload( 296 Bucket=self.bucket_name, Key=key, **self._extra_upload_args 297 ) 298 299 def _upload_part(self, upload_metadata, key, body, part_number): 300 """ 301 Upload a part into this multipart upload 302 303 :param dict upload_metadata: The multipart upload handle 304 :param str key: The key to use in the cloud service 305 :param object body: A stream-like object to upload 306 :param int part_number: Part number, starting from 1 307 :return: The part handle 308 :rtype: dict[str, None|str] 309 """ 310 part = self.s3.meta.client.upload_part( 311 Body=body, 312 Bucket=self.bucket_name, 313 Key=key, 314 UploadId=upload_metadata["UploadId"], 315 PartNumber=part_number, 316 ) 317 return { 318 "PartNumber": part_number, 319 "ETag": part["ETag"], 320 } 321 322 def _complete_multipart_upload(self, upload_metadata, key, parts): 323 """ 324 Finish a certain multipart upload 325 326 :param dict upload_metadata: The multipart upload handle 327 :param str key: The key to use in the cloud service 328 :param parts: The list of parts composing the multipart upload 329 """ 330 self.s3.meta.client.complete_multipart_upload( 331 Bucket=self.bucket_name, 332 Key=key, 333 UploadId=upload_metadata["UploadId"], 334 MultipartUpload={"Parts": parts}, 335 ) 336 337 def _abort_multipart_upload(self, upload_metadata, key): 338 """ 339 Abort a certain multipart upload 340 341 :param dict upload_metadata: The multipart upload handle 342 :param str key: The key to use in the cloud service 343 """ 344 self.s3.meta.client.abort_multipart_upload( 345 Bucket=self.bucket_name, Key=key, UploadId=upload_metadata["UploadId"] 346 ) 347 348 def delete_objects(self, paths): 349 """ 350 Delete the objects at the specified paths 351 352 :param List[str] paths: 353 """ 354 # Explicitly check if we are being asked to delete nothing at all and if 355 # so return without error. 356 if len(paths) == 0: 357 return 358 359 # S3 bulk deletion is limited to batches of 1000 keys 360 batch_size = 1000 361 try: 362 # If xrange exists then we are on python 2 so we need to use it 363 range_fun = xrange 364 except NameError: 365 # Otherwise just use range 366 range_fun = range 367 errors = False 368 for i in range_fun(0, len(paths), batch_size): 369 resp = self.s3.meta.client.delete_objects( 370 Bucket=self.bucket_name, 371 Delete={ 372 "Objects": [{"Key": path} for path in paths[i : i + batch_size]], 373 "Quiet": True, 374 }, 375 ) 376 if "Errors" in resp: 377 errors = True 378 for error_dict in resp["Errors"]: 379 logging.error( 380 'Deletion of object %s failed with error code: "%s", message: "%s"' 381 % (error_dict["Key"], error_dict["Code"], error_dict["Message"]) 382 ) 383 if errors: 384 raise CloudProviderError( 385 "Error from cloud provider while deleting objects - " 386 "please check the Barman logs" 387 ) 388