1# -*- coding: utf-8 -*-
2# © Copyright EnterpriseDB UK Limited 2018-2021
3#
4# This file is part of Barman.
5#
6# Barman is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# Barman is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Barman.  If not, see <http://www.gnu.org/licenses/>
18
19import bz2
20import gzip
21import logging
22import shutil
23from io import RawIOBase
24
25from barman.cloud import CloudInterface, CloudProviderError
26
27try:
28    # Python 3.x
29    from urllib.parse import urlparse
30except ImportError:
31    # Python 2.x
32    from urlparse import urlparse
33
34try:
35    import boto3
36    from botocore.exceptions import ClientError, EndpointConnectionError
37except ImportError:
38    raise SystemExit("Missing required python module: boto3")
39
40
41class StreamingBodyIO(RawIOBase):
42    """
43    Wrap a boto StreamingBody in the IOBase API.
44    """
45
46    def __init__(self, body):
47        self.body = body
48
49    def readable(self):
50        return True
51
52    def read(self, n=-1):
53        n = None if n < 0 else n
54        return self.body.read(n)
55
56
57class S3CloudInterface(CloudInterface):
58    # S3 multipart upload limitations
59    # http://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
60    MAX_CHUNKS_PER_FILE = 10000
61    MIN_CHUNK_SIZE = 5 << 20
62
63    # S3 permit a maximum of 5TB per file
64    # https://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
65    # This is a hard limit, while our upload procedure can go over the specified
66    # MAX_ARCHIVE_SIZE - so we set a maximum of 1TB per file
67    MAX_ARCHIVE_SIZE = 1 << 40
68
69    def __getstate__(self):
70        state = self.__dict__.copy()
71        # Remove boto3 client reference from the state as it cannot be pickled
72        # in Python >= 3.8 and multiprocessing will pickle the object when the
73        # worker processes are created.
74        # The worker processes create their own boto3 sessions so do not need
75        # the boto3 session from the parent process.
76        del state["s3"]
77        return state
78
79    def __setstate__(self, state):
80        self.__dict__.update(state)
81
82    def __init__(
83        self, url, encryption=None, jobs=2, profile_name=None, endpoint_url=None
84    ):
85        """
86        Create a new S3 interface given the S3 destination url and the profile
87        name
88
89        :param str url: Full URL of the cloud destination/source
90        :param str|None encryption: Encryption type string
91        :param int jobs: How many sub-processes to use for asynchronous
92          uploading, defaults to 2.
93        :param str profile_name: Amazon auth profile identifier
94        :param str endpoint_url: override default endpoint detection strategy
95          with this one
96        """
97        super(S3CloudInterface, self).__init__(
98            url=url,
99            jobs=jobs,
100        )
101        self.profile_name = profile_name
102        self.encryption = encryption
103        self.endpoint_url = endpoint_url
104
105        # Extract information from the destination URL
106        parsed_url = urlparse(url)
107        # If netloc is not present, the s3 url is badly formatted.
108        if parsed_url.netloc == "" or parsed_url.scheme != "s3":
109            raise ValueError("Invalid s3 URL address: %s" % url)
110        self.bucket_name = parsed_url.netloc
111        self.bucket_exists = None
112        self.path = parsed_url.path.lstrip("/")
113
114        # Build a session, so we can extract the correct resource
115        self._reinit_session()
116
117    def _reinit_session(self):
118        """
119        Create a new session
120        """
121        session = boto3.Session(profile_name=self.profile_name)
122        self.s3 = session.resource("s3", endpoint_url=self.endpoint_url)
123
124    @property
125    def _extra_upload_args(self):
126        """
127        Return a dict containing ExtraArgs to be passed to certain boto3 calls
128
129        Because some boto3 calls accept `ExtraArgs: {}` and others do not, we
130        return a nexted dict which can be expanded with `**` in the boto3 call.
131        """
132        additional_args = {}
133        if self.encryption:
134            additional_args["ServerSideEncryption"] = self.encryption
135        return additional_args
136
137    def test_connectivity(self):
138        """
139        Test AWS connectivity by trying to access a bucket
140        """
141        try:
142            # We are not even interested in the existence of the bucket,
143            # we just want to try if aws is reachable
144            self.bucket_exists = self._check_bucket_existence()
145            return True
146        except EndpointConnectionError as exc:
147            logging.error("Can't connect to cloud provider: %s", exc)
148            return False
149
150    def _check_bucket_existence(self):
151        """
152        Check cloud storage for the target bucket
153
154        :return: True if the bucket exists, False otherwise
155        :rtype: bool
156        """
157        try:
158            # Search the bucket on s3
159            self.s3.meta.client.head_bucket(Bucket=self.bucket_name)
160            return True
161        except ClientError as exc:
162            # If a client error is thrown, then check the error code.
163            # If code was 404, then the bucket does not exist
164            error_code = exc.response["Error"]["Code"]
165            if error_code == "404":
166                return False
167            # Otherwise there is nothing else to do than re-raise the original
168            # exception
169            raise
170
171    def _create_bucket(self):
172        """
173        Create the bucket in cloud storage
174        """
175        # Get the current region from client.
176        # Do not use session.region_name here because it may be None
177        region = self.s3.meta.client.meta.region_name
178        logging.info(
179            "Bucket '%s' does not exist, creating it on region '%s'",
180            self.bucket_name,
181            region,
182        )
183        create_bucket_config = {
184            "ACL": "private",
185        }
186        # The location constraint is required during bucket creation
187        # for all regions outside of us-east-1. This constraint cannot
188        # be specified in us-east-1; specifying it in this region
189        # results in a failure, so we will only
190        # add it if we are deploying outside of us-east-1.
191        # See https://github.com/boto/boto3/issues/125
192        if region != "us-east-1":
193            create_bucket_config["CreateBucketConfiguration"] = {
194                "LocationConstraint": region,
195            }
196        self.s3.Bucket(self.bucket_name).create(**create_bucket_config)
197
198    def list_bucket(self, prefix="", delimiter="/"):
199        """
200        List bucket content in a directory manner
201
202        :param str prefix:
203        :param str delimiter:
204        :return: List of objects and dirs right under the prefix
205        :rtype: List[str]
206        """
207        if prefix.startswith(delimiter):
208            prefix = prefix.lstrip(delimiter)
209
210        res = self.s3.meta.client.list_objects_v2(
211            Bucket=self.bucket_name, Prefix=prefix, Delimiter=delimiter
212        )
213
214        # List "folders"
215        keys = res.get("CommonPrefixes")
216        if keys is not None:
217            for k in keys:
218                yield k.get("Prefix")
219
220        # List "files"
221        objects = res.get("Contents")
222        if objects is not None:
223            for o in objects:
224                yield o.get("Key")
225
226    def download_file(self, key, dest_path, decompress):
227        """
228        Download a file from S3
229
230        :param str key: The S3 key to download
231        :param str dest_path: Where to put the destination file
232        :param bool decompress: Whenever to decompress this file or not
233        """
234        # Open the remote file
235        obj = self.s3.Object(self.bucket_name, key)
236        remote_file = obj.get()["Body"]
237
238        # Write the dest file in binary mode
239        with open(dest_path, "wb") as dest_file:
240            # If the file is not compressed, just copy its content
241            if not decompress:
242                shutil.copyfileobj(remote_file, dest_file)
243                return
244
245            if decompress == "gzip":
246                source_file = gzip.GzipFile(fileobj=remote_file, mode="rb")
247            elif decompress == "bzip2":
248                source_file = bz2.BZ2File(remote_file, "rb")
249            else:
250                raise ValueError("Unknown compression type: %s" % decompress)
251
252            with source_file:
253                shutil.copyfileobj(source_file, dest_file)
254
255    def remote_open(self, key):
256        """
257        Open a remote S3 object and returns a readable stream
258
259        :param str key: The key identifying the object to open
260        :return: A file-like object from which the stream can be read or None if
261          the key does not exist
262        """
263        try:
264            obj = self.s3.Object(self.bucket_name, key)
265            return StreamingBodyIO(obj.get()["Body"])
266        except ClientError as exc:
267            error_code = exc.response["Error"]["Code"]
268            if error_code == "NoSuchKey":
269                return None
270            else:
271                raise
272
273    def upload_fileobj(self, fileobj, key):
274        """
275        Synchronously upload the content of a file-like object to a cloud key
276
277        :param fileobj IOBase: File-like object to upload
278        :param str key: The key to identify the uploaded object
279        """
280        self.s3.meta.client.upload_fileobj(
281            Fileobj=fileobj,
282            Bucket=self.bucket_name,
283            Key=key,
284            ExtraArgs=self._extra_upload_args,
285        )
286
287    def create_multipart_upload(self, key):
288        """
289        Create a new multipart upload
290
291        :param key: The key to use in the cloud service
292        :return: The multipart upload handle
293        :rtype: dict[str, str]
294        """
295        return self.s3.meta.client.create_multipart_upload(
296            Bucket=self.bucket_name, Key=key, **self._extra_upload_args
297        )
298
299    def _upload_part(self, upload_metadata, key, body, part_number):
300        """
301        Upload a part into this multipart upload
302
303        :param dict upload_metadata: The multipart upload handle
304        :param str key: The key to use in the cloud service
305        :param object body: A stream-like object to upload
306        :param int part_number: Part number, starting from 1
307        :return: The part handle
308        :rtype: dict[str, None|str]
309        """
310        part = self.s3.meta.client.upload_part(
311            Body=body,
312            Bucket=self.bucket_name,
313            Key=key,
314            UploadId=upload_metadata["UploadId"],
315            PartNumber=part_number,
316        )
317        return {
318            "PartNumber": part_number,
319            "ETag": part["ETag"],
320        }
321
322    def _complete_multipart_upload(self, upload_metadata, key, parts):
323        """
324        Finish a certain multipart upload
325
326        :param dict upload_metadata:  The multipart upload handle
327        :param str key: The key to use in the cloud service
328        :param parts: The list of parts composing the multipart upload
329        """
330        self.s3.meta.client.complete_multipart_upload(
331            Bucket=self.bucket_name,
332            Key=key,
333            UploadId=upload_metadata["UploadId"],
334            MultipartUpload={"Parts": parts},
335        )
336
337    def _abort_multipart_upload(self, upload_metadata, key):
338        """
339        Abort a certain multipart upload
340
341        :param dict upload_metadata:  The multipart upload handle
342        :param str key: The key to use in the cloud service
343        """
344        self.s3.meta.client.abort_multipart_upload(
345            Bucket=self.bucket_name, Key=key, UploadId=upload_metadata["UploadId"]
346        )
347
348    def delete_objects(self, paths):
349        """
350        Delete the objects at the specified paths
351
352        :param List[str] paths:
353        """
354        # Explicitly check if we are being asked to delete nothing at all and if
355        # so return without error.
356        if len(paths) == 0:
357            return
358
359        # S3 bulk deletion is limited to batches of 1000 keys
360        batch_size = 1000
361        try:
362            # If xrange exists then we are on python 2 so we need to use it
363            range_fun = xrange
364        except NameError:
365            # Otherwise just use range
366            range_fun = range
367        errors = False
368        for i in range_fun(0, len(paths), batch_size):
369            resp = self.s3.meta.client.delete_objects(
370                Bucket=self.bucket_name,
371                Delete={
372                    "Objects": [{"Key": path} for path in paths[i : i + batch_size]],
373                    "Quiet": True,
374                },
375            )
376            if "Errors" in resp:
377                errors = True
378                for error_dict in resp["Errors"]:
379                    logging.error(
380                        'Deletion of object %s failed with error code: "%s", message: "%s"'
381                        % (error_dict["Key"], error_dict["Code"], error_dict["Message"])
382                    )
383        if errors:
384            raise CloudProviderError(
385                "Error from cloud provider while deleting objects - "
386                "please check the Barman logs"
387            )
388