1# Copyright 2010 OpenStack Foundation
2# All Rights Reserved.
3#
4#    Licensed under the Apache License, Version 2.0 (the "License"); you may
5#    not use this file except in compliance with the License. You may obtain
6#    a copy of the License at
7#
8#         http://www.apache.org/licenses/LICENSE-2.0
9#
10#    Unless required by applicable law or agreed to in writing, software
11#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13#    License for the specific language governing permissions and limitations
14#    under the License.
15
16"""Storage backend for S3 or Storage Servers that follow the S3 Protocol"""
17
18import logging
19import math
20import re
21
22from boto3 import session as boto_session
23from botocore import client as boto_client
24from botocore import exceptions as boto_exceptions
25from botocore import utils as boto_utils
26import eventlet
27from oslo_config import cfg
28from oslo_utils import encodeutils
29from oslo_utils import units
30import six
31from six.moves import urllib
32
33import glance_store
34from glance_store import capabilities
35from glance_store.common import utils
36import glance_store.driver
37from glance_store import exceptions
38from glance_store.i18n import _
39import glance_store.location
40
41LOG = logging.getLogger(__name__)
42
43DEFAULT_LARGE_OBJECT_SIZE = 100          # 100M
44DEFAULT_LARGE_OBJECT_CHUNK_SIZE = 10     # 10M
45DEFAULT_LARGE_OBJECT_MIN_CHUNK_SIZE = 5  # 5M
46DEFAULT_THREAD_POOLS = 10                # 10 pools
47MAX_PART_NUM = 10000                     # 10000 upload parts
48
49_S3_OPTS = [
50    cfg.StrOpt('s3_store_host',
51               help="""
52The host where the S3 server is listening.
53
54This configuration option sets the host of the S3 or S3 compatible storage
55Server. This option is required when using the S3 storage backend.
56The host can contain a DNS name (e.g. s3.amazonaws.com, my-object-storage.com)
57or an IP address (127.0.0.1).
58
59Possible values:
60    * A valid DNS name
61    * A valid IPv4 address
62
63Related Options:
64    * s3_store_access_key
65    * s3_store_secret_key
66
67"""),
68    cfg.StrOpt('s3_store_access_key',
69               secret=True,
70               help="""
71The S3 query token access key.
72
73This configuration option takes the access key for authenticating with the
74Amazon S3 or S3 compatible storage server. This option is required when using
75the S3 storage backend.
76
77Possible values:
78    * Any string value that is the access key for a user with appropriate
79      privileges
80
81Related Options:
82    * s3_store_host
83    * s3_store_secret_key
84
85"""),
86    cfg.StrOpt('s3_store_secret_key',
87               secret=True,
88               help="""
89The S3 query token secret key.
90
91This configuration option takes the secret key for authenticating with the
92Amazon S3 or S3 compatible storage server. This option is required when using
93the S3 storage backend.
94
95Possible values:
96    * Any string value that is a secret key corresponding to the access key
97      specified using the ``s3_store_host`` option
98
99Related Options:
100    * s3_store_host
101    * s3_store_access_key
102
103"""),
104    cfg.StrOpt('s3_store_bucket',
105               help="""
106The S3 bucket to be used to store the Glance data.
107
108This configuration option specifies where the glance images will be stored
109in the S3. If ``s3_store_create_bucket_on_put`` is set to true, it will be
110created automatically even if the bucket does not exist.
111
112Possible values:
113    * Any string value
114
115Related Options:
116    * s3_store_create_bucket_on_put
117    * s3_store_bucket_url_format
118
119"""),
120    cfg.BoolOpt('s3_store_create_bucket_on_put',
121                default=False,
122                help="""
123Determine whether S3 should create a new bucket.
124
125This configuration option takes boolean value to indicate whether Glance should
126create a new bucket to S3 if it does not exist.
127
128Possible values:
129    * Any Boolean value
130
131Related Options:
132    * None
133
134"""),
135    cfg.StrOpt('s3_store_bucket_url_format',
136               default='auto',
137               help="""
138The S3 calling format used to determine the object.
139
140This configuration option takes access model that is used to specify the
141address of an object in an S3 bucket.
142
143NOTE:
144In ``path``-style, the endpoint for the object looks like
145'https://s3.amazonaws.com/bucket/example.img'.
146And in ``virtual``-style, the endpoint for the object looks like
147'https://bucket.s3.amazonaws.com/example.img'.
148If you do not follow the DNS naming convention in the bucket name, you can
149get objects in the path style, but not in the virtual style.
150
151Possible values:
152    * Any string value of ``auto``, ``virtual``, or ``path``
153
154Related Options:
155    * s3_store_bucket
156
157"""),
158    cfg.IntOpt('s3_store_large_object_size',
159               default=DEFAULT_LARGE_OBJECT_SIZE,
160               help="""
161What size, in MB, should S3 start chunking image files and do a multipart
162upload in S3.
163
164This configuration option takes a threshold in MB to determine whether to
165upload the image to S3 as is or to split it (Multipart Upload).
166
167Note: You can only split up to 10,000 images.
168
169Possible values:
170    * Any positive integer value
171
172Related Options:
173    * s3_store_large_object_chunk_size
174    * s3_store_thread_pools
175
176"""),
177    cfg.IntOpt('s3_store_large_object_chunk_size',
178               default=DEFAULT_LARGE_OBJECT_CHUNK_SIZE,
179               help="""
180What multipart upload part size, in MB, should S3 use when uploading parts.
181
182This configuration option takes the image split size in MB for Multipart
183Upload.
184
185Note: You can only split up to 10,000 images.
186
187Possible values:
188    * Any positive integer value (must be greater than or equal to 5M)
189
190Related Options:
191    * s3_store_large_object_size
192    * s3_store_thread_pools
193
194"""),
195    cfg.IntOpt('s3_store_thread_pools',
196               default=DEFAULT_THREAD_POOLS,
197               help="""
198The number of thread pools to perform a multipart upload in S3.
199
200This configuration option takes the number of thread pools when performing a
201Multipart Upload.
202
203Possible values:
204    * Any positive integer value
205
206Related Options:
207    * s3_store_large_object_size
208    * s3_store_large_object_chunk_size
209
210""")
211]
212
213
214class UploadPart(object):
215    """The class for the upload part."""
216    def __init__(self, mpu, fp, partnum, chunks):
217        self.mpu = mpu
218        self.partnum = partnum
219        self.fp = fp
220        self.size = 0
221        self.chunks = chunks
222        self.etag = {}
223        self.success = True
224
225
226def run_upload(s3_client, bucket, key, part):
227    """Upload the upload part into S3 and set returned etag and size to its
228    part info.
229
230    :param s3_client: An object with credentials to connect to S3
231    :param bucket: The S3 bucket name
232    :param key: The object name to be stored (image identifier)
233    :param part: UploadPart object which used during multipart upload
234    """
235    pnum = part.partnum
236    bsize = part.chunks
237    upload_id = part.mpu['UploadId']
238    LOG.debug("Uploading upload part in S3 partnum=%(pnum)d, "
239              "size=%(bsize)d, key=%(key)s, UploadId=%(UploadId)s",
240              {'pnum': pnum, 'bsize': bsize, 'key': key,
241               'UploadId': upload_id})
242
243    try:
244        key = s3_client.upload_part(Body=part.fp,
245                                    Bucket=bucket,
246                                    ContentLength=bsize,
247                                    Key=key,
248                                    PartNumber=pnum,
249                                    UploadId=upload_id)
250        part.etag[part.partnum] = key['ETag']
251        part.size = bsize
252    except boto_exceptions.ClientError as e:
253        error_code = e.response['Error']['Code']
254        error_message = e.response['Error']['Message']
255        LOG.warning("Failed to upload part in S3 partnum=%(pnum)d, "
256                    "size=%(bsize)d, error code=%(error_code)d, "
257                    "error message=%(error_message)s",
258                    {'pnum': pnum, 'bsize': bsize, 'error_code': error_code,
259                     'error_message': error_message})
260        part.success = False
261    finally:
262        part.fp.close()
263
264
265class StoreLocation(glance_store.location.StoreLocation):
266    """Class describing an S3 URI.
267
268    An S3 URI can look like any of the following:
269
270        s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
271        s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
272
273    The s3+https:// URIs indicate there is an HTTPS s3service URL
274    """
275    def process_specs(self):
276        self.scheme = self.specs.get('scheme', 's3')
277        self.accesskey = self.specs.get('accesskey')
278        self.secretkey = self.specs.get('secretkey')
279        s3_host = self.specs.get('s3serviceurl')
280        self.bucket = self.specs.get('bucket')
281        self.key = self.specs.get('key')
282
283        if s3_host.startswith('https://'):
284            self.scheme = 's3+https'
285            s3_host = s3_host[len('https://'):].strip('/')
286        elif s3_host.startswith('http://'):
287            s3_host = s3_host[len('http://'):].strip('/')
288        self.s3serviceurl = s3_host.strip('/')
289
290    def _get_credstring(self):
291        if self.accesskey:
292            return '%s:%s@' % (self.accesskey, self.secretkey)
293        return ''
294
295    def get_uri(self):
296        return "%s://%s%s/%s/%s" % (self.scheme, self._get_credstring(),
297                                    self.s3serviceurl, self.bucket, self.key)
298
299    def parse_uri(self, uri):
300        """Parse URLs.
301
302        Note that an Amazon AWS secret key can contain the forward slash,
303        which is entirely retarded, and breaks urlparse miserably.
304        This function works around that issue.
305        """
306        # Make sure that URIs that contain multiple schemes, such as:
307        # s3://accesskey:secretkey@https://s3.amazonaws.com/bucket/key-id
308        # are immediately rejected.
309        if uri.count('://') != 1:
310            reason = ("URI cannot contain more than one occurrence "
311                      "of a scheme. If you have specified a URI like "
312                      "s3://accesskey:secretkey@"
313                      "https://s3.amazonaws.com/bucket/key-id"
314                      ", you need to change it to use the "
315                      "s3+https:// scheme, like so: "
316                      "s3+https://accesskey:secretkey@"
317                      "s3.amazonaws.com/bucket/key-id")
318            LOG.info("Invalid store uri: %s", reason)
319            raise exceptions.BadStoreUri(uri=uri)
320
321        pieces = urllib.parse.urlparse(uri)
322        self.validate_schemas(uri, valid_schemas=(
323            's3://', 's3+http://', 's3+https://'))
324        self.scheme = pieces.scheme
325        path = pieces.path.strip('/')
326        netloc = pieces.netloc.strip('/')
327        entire_path = (netloc + '/' + path).strip('/')
328
329        if '@' in uri:
330            creds, path = entire_path.split('@')
331            cred_parts = creds.split(':')
332
333            try:
334                self.accesskey = cred_parts[0]
335                self.secretkey = cred_parts[1]
336            except IndexError:
337                LOG.error("Badly formed S3 credentials")
338                raise exceptions.BadStoreUri(uri=uri)
339        else:
340            self.accesskey = None
341            path = entire_path
342        try:
343            path_parts = path.split('/')
344            self.key = path_parts.pop()
345            self.bucket = path_parts.pop()
346            if path_parts:
347                self.s3serviceurl = '/'.join(path_parts).strip('/')
348            else:
349                LOG.error("Badly formed S3 URI. Missing s3 service URL.")
350                raise exceptions.BadStoreUri(uri=uri)
351        except IndexError:
352            LOG.error("Badly formed S3 URI")
353            raise exceptions.BadStoreUri(uri=uri)
354
355
356class Store(glance_store.driver.Store):
357    """An implementation of the s3 adapter."""
358
359    _CAPABILITIES = capabilities.BitMasks.RW_ACCESS
360    OPTIONS = _S3_OPTS
361    EXAMPLE_URL = "s3://<ACCESS_KEY>:<SECRET_KEY>@<S3_URL>/<BUCKET>/<OBJ>"
362
363    READ_CHUNKSIZE = 64 * units.Ki
364    WRITE_CHUNKSIZE = 5 * units.Mi
365
366    @staticmethod
367    def get_schemes():
368        return 's3', 's3+http', 's3+https'
369
370    def configure_add(self):
371        """
372        Configure the Store to use the stored configuration options
373        Any store that needs special configuration should implement
374        this method. If the store was not able to successfully configure
375        itself, it should raise `exceptions.BadStoreConfiguration`
376        """
377        self.s3_host = self._option_get('s3_store_host')
378        self.access_key = self._option_get('s3_store_access_key')
379        self.secret_key = self._option_get('s3_store_secret_key')
380        self.bucket = self._option_get('s3_store_bucket')
381
382        self.scheme = 's3'
383        if self.s3_host.startswith('https://'):
384            self.scheme = 's3+https'
385            self.full_s3_host = self.s3_host
386        elif self.s3_host.startswith('http://'):
387            self.full_s3_host = self.s3_host
388        else:  # Defaults http
389            self.full_s3_host = 'http://' + self.s3_host
390
391        _s3_obj_size = self._option_get('s3_store_large_object_size')
392        self.s3_store_large_object_size = _s3_obj_size * units.Mi
393        _s3_ck_size = self._option_get('s3_store_large_object_chunk_size')
394        _s3_ck_min = DEFAULT_LARGE_OBJECT_MIN_CHUNK_SIZE
395        if _s3_ck_size < _s3_ck_min:
396            reason = _("s3_store_large_object_chunk_size must be at "
397                       "least %d MB.") % _s3_ck_min
398            LOG.error(reason)
399            raise exceptions.BadStoreConfiguration(store_name="s3",
400                                                   reason=reason)
401        self.s3_store_large_object_chunk_size = _s3_ck_size * units.Mi
402
403        self.s3_store_thread_pools = self._option_get('s3_store_thread_pools')
404        if self.s3_store_thread_pools <= 0:
405            reason = _("s3_store_thread_pools must be a positive "
406                       "integer. %s") % self.s3_store_thread_pools
407            LOG.error(reason)
408            raise exceptions.BadStoreConfiguration(store_name="s3",
409                                                   reason=reason)
410
411        if self.backend_group:
412            self._set_url_prefix()
413
414    def _set_url_prefix(self):
415        s3_host = self.s3_host
416        if s3_host.startswith('http://'):
417            s3_host = s3_host[len('http://'):]
418        elif s3_host.startswith('https://'):
419            s3_host = s3_host[len('https://'):]
420
421        self._url_prefix = "%s://%s:%s@%s/%s" % (self.scheme, self.access_key,
422                                                 self.secret_key, s3_host,
423                                                 self.bucket)
424
425    def _option_get(self, param):
426        if self.backend_group:
427            store_conf = getattr(self.conf, self.backend_group)
428        else:
429            store_conf = self.conf.glance_store
430
431        result = getattr(store_conf, param)
432        if not result:
433            if param == 's3_store_create_bucket_on_put':
434                return result
435            reason = _("Could not find %s in configuration options.") % param
436            LOG.error(reason)
437            raise exceptions.BadStoreConfiguration(store_name="s3",
438                                                   reason=reason)
439        return result
440
441    def _create_s3_client(self, loc):
442        """Create a client object to use when connecting to S3.
443
444        :param loc: `glance_store.location.Location` object, supplied
445                    from glance_store.location.get_location_from_uri()
446        :returns: An object with credentials to connect to S3
447        """
448        s3_host = self._option_get('s3_store_host')
449        url_format = self._option_get('s3_store_bucket_url_format')
450        calling_format = {'addressing_style': url_format}
451
452        session = boto_session.Session(aws_access_key_id=loc.accesskey,
453                                       aws_secret_access_key=loc.secretkey)
454        config = boto_client.Config(s3=calling_format)
455        location = get_s3_location(s3_host)
456
457        bucket_name = loc.bucket
458        if (url_format == 'virtual' and
459                not boto_utils.check_dns_name(bucket_name)):
460            raise boto_exceptions.InvalidDNSNameError(bucket_name=bucket_name)
461
462        region_name, endpoint_url = None, None
463        if location:
464            region_name = location
465        else:
466            endpoint_url = s3_host
467
468        return session.client(service_name='s3',
469                              endpoint_url=endpoint_url,
470                              region_name=region_name,
471                              use_ssl=(loc.scheme == 's3+https'),
472                              config=config)
473
474    def _operation_set(self, loc):
475        """Objects and variables frequently used when operating S3 are
476        returned together.
477
478        :param loc: `glance_store.location.Location` object, supplied
479                     from glance_store.location.get_location_from_uri()
480        "returns: tuple of: (1) S3 client object, (2) Bucket name,
481                  (3) Image Object name
482        """
483        return self._create_s3_client(loc), loc.bucket, loc.key
484
485    @capabilities.check
486    def get(self, location, offset=0, chunk_size=None, context=None):
487        """
488        Takes a `glance_store.location.Location` object that indicates
489        where to find the image file, and returns a tuple of generator
490        (for reading the image file) and image_size
491
492        :param location: `glance_store.location.Location` object, supplied
493                         from glance_store.location.get_location_from_uri()
494        :raises: `glance_store.exceptions.NotFound` if image does not exist
495        """
496        loc = location.store_location
497        s3_client, bucket, key = self._operation_set(loc)
498
499        if not self._object_exists(s3_client, bucket, key):
500            LOG.warning("Could not find key %(key)s in "
501                        "bucket %(bucket)s", {'key': key, 'bucket': bucket})
502            raise exceptions.NotFound(image=key)
503
504        key = s3_client.get_object(Bucket=bucket, Key=key)
505
506        LOG.debug("Retrieved image object from S3 using s3_host=%(s3_host)s, "
507                  "access_key=%(accesskey)s, bucket=%(bucket)s, "
508                  "key=%(key)s)",
509                  {'s3_host': loc.s3serviceurl, 'accesskey': loc.accesskey,
510                   'bucket': bucket, 'key': key})
511
512        cs = self.READ_CHUNKSIZE
513
514        class ResponseIndexable(glance_store.Indexable):
515            def another(self):
516                try:
517                    return next(self.wrapped)
518                except StopIteration:
519                    return b''
520
521        return (ResponseIndexable(utils.chunkiter(key['Body'], cs),
522                                  key['ContentLength']), key['ContentLength'])
523
524    def get_size(self, location, context=None):
525        """
526        Takes a `glance_store.location.Location` object that indicates
527        where to find the image file and returns the image size
528
529        :param location: `glance_store.location.Location` object, supplied
530                         from glance_store.location.get_location_from_uri()
531        :raises: `glance_store.exceptions.NotFound` if image does not exist
532        :rtype: int
533        """
534        loc = location.store_location
535        s3_client, bucket, key = self._operation_set(loc)
536
537        if not self._object_exists(s3_client, bucket, key):
538            LOG.warning("Could not find key %(key)s in "
539                        "bucket %(bucket)s", {'key': key, 'bucket': bucket})
540            raise exceptions.NotFound(image=key)
541
542        key = s3_client.head_object(Bucket=bucket, Key=key)
543        return key['ContentLength']
544
545    @capabilities.check
546    def add(self, image_id, image_file, image_size, hashing_algo, context=None,
547            verifier=None):
548        """
549        Stores an image file with supplied identifier to the backend
550        storage system and returns a tuple containing information
551        about the stored image.
552
553        :param image_id: The opaque image identifier
554        :param image_file: The image data to write, as a file-like object
555        :param image_size: The size of the image data to write, in bytes
556        :param hashing_algo: A hashlib algorithm identifier (string)
557        :param context: A context object
558        :param verifier: An object used to verify signatures for images
559
560        :returns: tuple of: (1) URL in backing store, (2) bytes written,
561                  (3) checksum, (4) multihash value, and (5) a dictionary
562                  with storage system specific information
563        :raises: `glance_store.exceptions.Duplicate` if the image already
564                 exists
565        """
566        loc = StoreLocation(store_specs={'scheme': self.scheme,
567                                         'bucket': self.bucket,
568                                         'key': image_id,
569                                         's3serviceurl': self.full_s3_host,
570                                         'accesskey': self.access_key,
571                                         'secretkey': self.secret_key},
572                            conf=self.conf,
573                            backend_group=self.backend_group)
574
575        s3_client, bucket, key = self._operation_set(loc)
576
577        if not self._bucket_exists(s3_client, bucket):
578            if self._option_get('s3_store_create_bucket_on_put'):
579                self._create_bucket(s3_client,
580                                    self._option_get('s3_store_host'),
581                                    bucket)
582            else:
583                msg = (_("The bucket %s does not exist in "
584                         "S3. Please set the "
585                         "s3_store_create_bucket_on_put option "
586                         "to add bucket to S3 automatically.") % bucket)
587                raise glance_store.BackendException(msg)
588
589        LOG.debug("Adding image object to S3 using (s3_host=%(s3_host)s, "
590                  "access_key=%(access_key)s, bucket=%(bucket)s, "
591                  "key=%(key)s)",
592                  {'s3_host': self.s3_host, 'access_key': loc.accesskey,
593                   'bucket': bucket, 'key': key})
594
595        if not self._object_exists(s3_client, bucket, key):
596            if image_size < self.s3_store_large_object_size:
597                return self._add_singlepart(s3_client=s3_client,
598                                            image_file=image_file,
599                                            bucket=bucket,
600                                            key=key,
601                                            loc=loc,
602                                            hashing_algo=hashing_algo,
603                                            verifier=verifier)
604
605            return self._add_multipart(s3_client=s3_client,
606                                       image_file=image_file,
607                                       image_size=image_size,
608                                       bucket=bucket,
609                                       key=key,
610                                       loc=loc,
611                                       hashing_algo=hashing_algo,
612                                       verifier=verifier)
613        LOG.warning("S3 already has an image with bucket ID %(bucket)s, "
614                    "key %(key)s", {'bucket': bucket, 'key': key})
615        raise exceptions.Duplicate(image=key)
616
617    def _add_singlepart(self, s3_client, image_file, bucket, key, loc,
618                        hashing_algo, verifier):
619        """Stores an image file with a single part upload to S3 backend.
620
621        :param s3_client: An object with credentials to connect to S3
622        :param image_file: The image data to write, as a file-like object
623        :param bucket: S3 bucket name
624        :param key: The object name to be stored (image identifier)
625        :param loc: `glance_store.location.Location` object, supplied
626                    from glance_store.location.get_location_from_uri()
627        :param hashing_algo: A hashlib algorithm identifier (string)
628        :param verifier: An object used to verify signatures for images
629        :returns: tuple of: (1) URL in backing store, (2) bytes written,
630                  (3) checksum, (4) multihash value, and (5) a dictionary
631                  with storage system specific information
632        """
633        os_hash_value = utils.get_hasher(hashing_algo, False)
634        checksum = utils.get_hasher('md5', False)
635        image_data = b''
636        image_size = 0
637        for chunk in utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE):
638            image_data += chunk
639            image_size += len(chunk)
640            os_hash_value.update(chunk)
641            checksum.update(chunk)
642            if verifier:
643                verifier.update(chunk)
644
645        s3_client.put_object(Body=image_data,
646                             Bucket=bucket,
647                             Key=key)
648        hash_hex = os_hash_value.hexdigest()
649        checksum_hex = checksum.hexdigest()
650
651        # Add store backend information to location metadata
652        metadata = {}
653        if self.backend_group:
654            metadata['store'] = self.backend_group
655
656        LOG.debug("Wrote %(size)d bytes to S3 key named %(key)s "
657                  "with checksum %(checksum)s",
658                  {'size': image_size, 'key': key, 'checksum': checksum_hex})
659
660        return loc.get_uri(), image_size, checksum_hex, hash_hex, metadata
661
662    def _add_multipart(self, s3_client, image_file, image_size, bucket,
663                       key, loc, hashing_algo, verifier):
664        """Stores an image file with a multi part upload to S3 backend.
665
666        :param s3_client: An object with credentials to connect to S3
667        :param image_file: The image data to write, as a file-like object
668        :param bucket: S3 bucket name
669        :param key: The object name to be stored (image identifier)
670        :param loc: `glance_store.location.Location` object, supplied
671                    from glance_store.location.get_location_from_uri()
672        :param hashing_algo: A hashlib algorithm identifier (string)
673        :param verifier: An object used to verify signatures for images
674        :returns: tuple of: (1) URL in backing store, (2) bytes written,
675                  (3) checksum, (4) multihash value, and (5) a dictionary
676                  with storage system specific information
677        """
678        os_hash_value = utils.get_hasher(hashing_algo, False)
679        checksum = utils.get_hasher('md5', False)
680        pool_size = self.s3_store_thread_pools
681        pool = eventlet.greenpool.GreenPool(size=pool_size)
682        mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
683        upload_id = mpu['UploadId']
684        LOG.debug("Multipart initiate key=%(key)s, UploadId=%(UploadId)s",
685                  {'key': key, 'UploadId': upload_id})
686        cstart = 0
687        plist = []
688
689        chunk_size = int(math.ceil(float(image_size) / MAX_PART_NUM))
690        write_chunk_size = max(self.s3_store_large_object_chunk_size,
691                               chunk_size)
692        it = utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE)
693        buffered_chunk = b''
694        while True:
695            try:
696                buffered_clen = len(buffered_chunk)
697                if buffered_clen < write_chunk_size:
698                    # keep reading data
699                    read_chunk = next(it)
700                    buffered_chunk += read_chunk
701                    continue
702                else:
703                    write_chunk = buffered_chunk[:write_chunk_size]
704                    remained_data = buffered_chunk[write_chunk_size:]
705                    os_hash_value.update(write_chunk)
706                    checksum.update(write_chunk)
707                    if verifier:
708                        verifier.update(write_chunk)
709                    fp = six.BytesIO(write_chunk)
710                    fp.seek(0)
711                    part = UploadPart(mpu, fp, cstart + 1, len(write_chunk))
712                    pool.spawn_n(run_upload, s3_client, bucket, key, part)
713                    plist.append(part)
714                    cstart += 1
715                    buffered_chunk = remained_data
716            except StopIteration:
717                if len(buffered_chunk) > 0:
718                    # Write the last chunk data
719                    write_chunk = buffered_chunk
720                    os_hash_value.update(write_chunk)
721                    checksum.update(write_chunk)
722                    if verifier:
723                        verifier.update(write_chunk)
724                    fp = six.BytesIO(write_chunk)
725                    fp.seek(0)
726                    part = UploadPart(mpu, fp, cstart + 1, len(write_chunk))
727                    pool.spawn_n(run_upload, s3_client, bucket, key, part)
728                    plist.append(part)
729                break
730
731        pedict = {}
732        total_size = 0
733        pool.waitall()
734
735        for part in plist:
736            pedict.update(part.etag)
737            total_size += part.size
738
739        success = True
740        for part in plist:
741            if not part.success:
742                success = False
743
744        if success:
745            # Complete
746            mpu_list = self._get_mpu_list(pedict)
747            s3_client.complete_multipart_upload(Bucket=bucket,
748                                                Key=key,
749                                                MultipartUpload=mpu_list,
750                                                UploadId=upload_id)
751            hash_hex = os_hash_value.hexdigest()
752            checksum_hex = checksum.hexdigest()
753
754            # Add store backend information to location metadata
755            metadata = {}
756            if self.backend_group:
757                metadata['store'] = self.backend_group
758
759            LOG.info("Multipart complete key=%(key)s "
760                     "UploadId=%(UploadId)s "
761                     "Wrote %(total_size)d bytes to S3 key "
762                     "named %(key)s "
763                     "with checksum %(checksum)s",
764                     {'key': key, 'UploadId': upload_id,
765                      'total_size': total_size, 'checksum': checksum_hex})
766            return loc.get_uri(), total_size, checksum_hex, hash_hex, metadata
767
768        # Abort
769        s3_client.abort_multipart_upload(Bucket=bucket, Key=key,
770                                         UploadId=upload_id)
771        LOG.error("Some parts failed to upload to S3. "
772                  "Aborted the key=%s", key)
773        msg = _("Failed to add image object to S3. key=%s") % key
774        raise glance_store.BackendException(msg)
775
776    @capabilities.check
777    def delete(self, location, context=None):
778        """
779        Takes a `glance_store.location.Location` object that indicates
780        where to find the image file to delete.
781
782        :param location: `glance_store.location.Location` object, supplied
783                         from glance_store.location.get_location_from_uri()
784
785        :raises: NotFound if image does not exist;
786                InUseByStore if image is in use or snapshot unprotect failed
787        """
788        loc = location.store_location
789        s3_client, bucket, key = self._operation_set(loc)
790
791        if not self._object_exists(s3_client, bucket, key):
792            LOG.warning("Could not find key %(key)s in bucket %(bucket)s",
793                        {'key': key, 'bucket': bucket})
794            raise exceptions.NotFound(image=key)
795
796        LOG.debug("Deleting image object from S3 using s3_host=%(s3_host)s, "
797                  "accesskey=%(accesskey)s, bucket=%(bucket)s, key=%(key)s)",
798                  {'s3_host': loc.s3serviceurl, 'accesskey': loc.accesskey,
799                   'bucket': bucket, 'key': key})
800
801        return s3_client.delete_object(Bucket=bucket, Key=key)
802
803    @staticmethod
804    def _bucket_exists(s3_client, bucket):
805        """Check whether bucket exists in the S3.
806
807        :param s3_client: An object with credentials to connect to S3
808        :param bucket: S3 bucket name
809        :returns: boolean value; If the value is true, the bucket is exist
810                  if false, it is not.
811        :raises: BadStoreConfiguration if cannot connect to S3 successfully
812        """
813        try:
814            s3_client.head_bucket(Bucket=bucket)
815        except boto_exceptions.ClientError as e:
816            error_code = e.response['Error']['Code']
817            if error_code == '404':
818                return False
819            msg = ("Failed to get bucket info: %s" %
820                   encodeutils.exception_to_unicode(e))
821            LOG.error(msg)
822            raise glance_store.BadStoreConfiguration(store_name='s3',
823                                                     reason=msg)
824        else:
825            return True
826
827    @staticmethod
828    def _object_exists(s3_client, bucket, key):
829        """Check whether object exists in the specific bucket of S3.
830
831        :param s3_client: An object with credentials to connect to S3
832        :param bucket: S3 bucket name
833        :param key: The image object name
834        :returns: boolean value; If the value is true, the object is exist
835                  if false, it is not.
836        :raises: BadStoreConfiguration if cannot connect to S3 successfully
837        """
838        try:
839            s3_client.head_object(Bucket=bucket, Key=key)
840        except boto_exceptions.ClientError as e:
841            error_code = e.response['Error']['Code']
842            if error_code == '404':
843                return False
844            msg = ("Failed to get object info: %s" %
845                   encodeutils.exception_to_unicode(e))
846            LOG.error(msg)
847            raise glance_store.BadStoreConfiguration(store_name='s3',
848                                                     reason=msg)
849        else:
850            return True
851
852    @staticmethod
853    def _create_bucket(s3_client, s3_host, bucket):
854        """Create bucket into the S3.
855
856        :param s3_client: An object with credentials to connect to S3
857        :param s3_host: S3 endpoint url
858        :param bucket: S3 bucket name
859        :raises: BadStoreConfiguration if cannot connect to S3 successfully
860        """
861        region = get_s3_location(s3_host)
862        try:
863            s3_client.create_bucket(
864                Bucket=bucket,
865            ) if region == '' else s3_client.create_bucket(
866                Bucket=bucket,
867                CreateBucketConfiguration={
868                    'LocationConstraint': region
869                }
870            )
871        except boto_exceptions.ClientError as e:
872            msg = ("Failed to add bucket to S3: %s" %
873                   encodeutils.exception_to_unicode(e))
874            LOG.error(msg)
875            raise glance_store.BadStoreConfiguration(store_name='s3',
876                                                     reason=msg)
877
878    @staticmethod
879    def _get_mpu_list(pedict):
880        """Convert an object type and struct for use in
881        boto3.client('s3').complete_multipart_upload.
882
883        :param pedict: dict which containing UploadPart.etag
884        :returns: list with pedict converted properly
885        """
886        return {
887            'Parts': [
888                {
889                    'PartNumber': pnum,
890                    'ETag': etag
891                } for pnum, etag in six.iteritems(pedict)
892            ]
893        }
894
895
896def get_s3_location(s3_host):
897    """Get S3 region information from ``s3_store_host``.
898
899    :param s3_host: S3 endpoint url
900    :returns: string value; region information which user wants to use on
901              Amazon S3, and if user wants to use S3 compatible storage,
902              returns ''
903    """
904    locations = {
905        's3.amazonaws.com': '',
906        's3-us-east-1.amazonaws.com': 'us-east-1',
907        's3-us-east-2.amazonaws.com': 'us-east-2',
908        's3-us-west-1.amazonaws.com': 'us-west-1',
909        's3-us-west-2.amazonaws.com': 'us-west-2',
910        's3-ap-east-1.amazonaws.com': 'ap-east-1',
911        's3-ap-south-1.amazonaws.com': 'ap-south-1',
912        's3-ap-northeast-1.amazonaws.com': 'ap-northeast-1',
913        's3-ap-northeast-2.amazonaws.com': 'ap-northeast-2',
914        's3-ap-northeast-3.amazonaws.com': 'ap-northeast-3',
915        's3-ap-southeast-1.amazonaws.com': 'ap-southeast-1',
916        's3-ap-southeast-2.amazonaws.com': 'ap-southeast-2',
917        's3-ca-central-1.amazonaws.com': 'ca-central-1',
918        's3-cn-north-1.amazonaws.com.cn': 'cn-north-1',
919        's3-cn-northwest-1.amazonaws.com.cn': 'cn-northwest-1',
920        's3-eu-central-1.amazonaws.com': 'eu-central-1',
921        's3-eu-west-1.amazonaws.com': 'eu-west-1',
922        's3-eu-west-2.amazonaws.com': 'eu-west-2',
923        's3-eu-west-3.amazonaws.com': 'eu-west-3',
924        's3-eu-north-1.amazonaws.com': 'eu-north-1',
925        's3-sa-east-1.amazonaws.com': 'sa-east-1'
926    }
927    # strip off scheme and port if present
928    key = re.sub(r'^(https?://)?(?P<host>[^:]+[^/])(:[0-9]+)?/?$',
929                 r'\g<host>',
930                 s3_host)
931    return locations.get(key, '')
932