1# Copyright 2010 OpenStack Foundation 2# All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may 5# not use this file except in compliance with the License. You may obtain 6# a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations 14# under the License. 15 16"""Storage backend for S3 or Storage Servers that follow the S3 Protocol""" 17 18import logging 19import math 20import re 21 22from boto3 import session as boto_session 23from botocore import client as boto_client 24from botocore import exceptions as boto_exceptions 25from botocore import utils as boto_utils 26import eventlet 27from oslo_config import cfg 28from oslo_utils import encodeutils 29from oslo_utils import units 30import six 31from six.moves import urllib 32 33import glance_store 34from glance_store import capabilities 35from glance_store.common import utils 36import glance_store.driver 37from glance_store import exceptions 38from glance_store.i18n import _ 39import glance_store.location 40 41LOG = logging.getLogger(__name__) 42 43DEFAULT_LARGE_OBJECT_SIZE = 100 # 100M 44DEFAULT_LARGE_OBJECT_CHUNK_SIZE = 10 # 10M 45DEFAULT_LARGE_OBJECT_MIN_CHUNK_SIZE = 5 # 5M 46DEFAULT_THREAD_POOLS = 10 # 10 pools 47MAX_PART_NUM = 10000 # 10000 upload parts 48 49_S3_OPTS = [ 50 cfg.StrOpt('s3_store_host', 51 help=""" 52The host where the S3 server is listening. 53 54This configuration option sets the host of the S3 or S3 compatible storage 55Server. This option is required when using the S3 storage backend. 56The host can contain a DNS name (e.g. s3.amazonaws.com, my-object-storage.com) 57or an IP address (127.0.0.1). 58 59Possible values: 60 * A valid DNS name 61 * A valid IPv4 address 62 63Related Options: 64 * s3_store_access_key 65 * s3_store_secret_key 66 67"""), 68 cfg.StrOpt('s3_store_access_key', 69 secret=True, 70 help=""" 71The S3 query token access key. 72 73This configuration option takes the access key for authenticating with the 74Amazon S3 or S3 compatible storage server. This option is required when using 75the S3 storage backend. 76 77Possible values: 78 * Any string value that is the access key for a user with appropriate 79 privileges 80 81Related Options: 82 * s3_store_host 83 * s3_store_secret_key 84 85"""), 86 cfg.StrOpt('s3_store_secret_key', 87 secret=True, 88 help=""" 89The S3 query token secret key. 90 91This configuration option takes the secret key for authenticating with the 92Amazon S3 or S3 compatible storage server. This option is required when using 93the S3 storage backend. 94 95Possible values: 96 * Any string value that is a secret key corresponding to the access key 97 specified using the ``s3_store_host`` option 98 99Related Options: 100 * s3_store_host 101 * s3_store_access_key 102 103"""), 104 cfg.StrOpt('s3_store_bucket', 105 help=""" 106The S3 bucket to be used to store the Glance data. 107 108This configuration option specifies where the glance images will be stored 109in the S3. If ``s3_store_create_bucket_on_put`` is set to true, it will be 110created automatically even if the bucket does not exist. 111 112Possible values: 113 * Any string value 114 115Related Options: 116 * s3_store_create_bucket_on_put 117 * s3_store_bucket_url_format 118 119"""), 120 cfg.BoolOpt('s3_store_create_bucket_on_put', 121 default=False, 122 help=""" 123Determine whether S3 should create a new bucket. 124 125This configuration option takes boolean value to indicate whether Glance should 126create a new bucket to S3 if it does not exist. 127 128Possible values: 129 * Any Boolean value 130 131Related Options: 132 * None 133 134"""), 135 cfg.StrOpt('s3_store_bucket_url_format', 136 default='auto', 137 help=""" 138The S3 calling format used to determine the object. 139 140This configuration option takes access model that is used to specify the 141address of an object in an S3 bucket. 142 143NOTE: 144In ``path``-style, the endpoint for the object looks like 145'https://s3.amazonaws.com/bucket/example.img'. 146And in ``virtual``-style, the endpoint for the object looks like 147'https://bucket.s3.amazonaws.com/example.img'. 148If you do not follow the DNS naming convention in the bucket name, you can 149get objects in the path style, but not in the virtual style. 150 151Possible values: 152 * Any string value of ``auto``, ``virtual``, or ``path`` 153 154Related Options: 155 * s3_store_bucket 156 157"""), 158 cfg.IntOpt('s3_store_large_object_size', 159 default=DEFAULT_LARGE_OBJECT_SIZE, 160 help=""" 161What size, in MB, should S3 start chunking image files and do a multipart 162upload in S3. 163 164This configuration option takes a threshold in MB to determine whether to 165upload the image to S3 as is or to split it (Multipart Upload). 166 167Note: You can only split up to 10,000 images. 168 169Possible values: 170 * Any positive integer value 171 172Related Options: 173 * s3_store_large_object_chunk_size 174 * s3_store_thread_pools 175 176"""), 177 cfg.IntOpt('s3_store_large_object_chunk_size', 178 default=DEFAULT_LARGE_OBJECT_CHUNK_SIZE, 179 help=""" 180What multipart upload part size, in MB, should S3 use when uploading parts. 181 182This configuration option takes the image split size in MB for Multipart 183Upload. 184 185Note: You can only split up to 10,000 images. 186 187Possible values: 188 * Any positive integer value (must be greater than or equal to 5M) 189 190Related Options: 191 * s3_store_large_object_size 192 * s3_store_thread_pools 193 194"""), 195 cfg.IntOpt('s3_store_thread_pools', 196 default=DEFAULT_THREAD_POOLS, 197 help=""" 198The number of thread pools to perform a multipart upload in S3. 199 200This configuration option takes the number of thread pools when performing a 201Multipart Upload. 202 203Possible values: 204 * Any positive integer value 205 206Related Options: 207 * s3_store_large_object_size 208 * s3_store_large_object_chunk_size 209 210""") 211] 212 213 214class UploadPart(object): 215 """The class for the upload part.""" 216 def __init__(self, mpu, fp, partnum, chunks): 217 self.mpu = mpu 218 self.partnum = partnum 219 self.fp = fp 220 self.size = 0 221 self.chunks = chunks 222 self.etag = {} 223 self.success = True 224 225 226def run_upload(s3_client, bucket, key, part): 227 """Upload the upload part into S3 and set returned etag and size to its 228 part info. 229 230 :param s3_client: An object with credentials to connect to S3 231 :param bucket: The S3 bucket name 232 :param key: The object name to be stored (image identifier) 233 :param part: UploadPart object which used during multipart upload 234 """ 235 pnum = part.partnum 236 bsize = part.chunks 237 upload_id = part.mpu['UploadId'] 238 LOG.debug("Uploading upload part in S3 partnum=%(pnum)d, " 239 "size=%(bsize)d, key=%(key)s, UploadId=%(UploadId)s", 240 {'pnum': pnum, 'bsize': bsize, 'key': key, 241 'UploadId': upload_id}) 242 243 try: 244 key = s3_client.upload_part(Body=part.fp, 245 Bucket=bucket, 246 ContentLength=bsize, 247 Key=key, 248 PartNumber=pnum, 249 UploadId=upload_id) 250 part.etag[part.partnum] = key['ETag'] 251 part.size = bsize 252 except boto_exceptions.ClientError as e: 253 error_code = e.response['Error']['Code'] 254 error_message = e.response['Error']['Message'] 255 LOG.warning("Failed to upload part in S3 partnum=%(pnum)d, " 256 "size=%(bsize)d, error code=%(error_code)d, " 257 "error message=%(error_message)s", 258 {'pnum': pnum, 'bsize': bsize, 'error_code': error_code, 259 'error_message': error_message}) 260 part.success = False 261 finally: 262 part.fp.close() 263 264 265class StoreLocation(glance_store.location.StoreLocation): 266 """Class describing an S3 URI. 267 268 An S3 URI can look like any of the following: 269 270 s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id 271 s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id 272 273 The s3+https:// URIs indicate there is an HTTPS s3service URL 274 """ 275 def process_specs(self): 276 self.scheme = self.specs.get('scheme', 's3') 277 self.accesskey = self.specs.get('accesskey') 278 self.secretkey = self.specs.get('secretkey') 279 s3_host = self.specs.get('s3serviceurl') 280 self.bucket = self.specs.get('bucket') 281 self.key = self.specs.get('key') 282 283 if s3_host.startswith('https://'): 284 self.scheme = 's3+https' 285 s3_host = s3_host[len('https://'):].strip('/') 286 elif s3_host.startswith('http://'): 287 s3_host = s3_host[len('http://'):].strip('/') 288 self.s3serviceurl = s3_host.strip('/') 289 290 def _get_credstring(self): 291 if self.accesskey: 292 return '%s:%s@' % (self.accesskey, self.secretkey) 293 return '' 294 295 def get_uri(self): 296 return "%s://%s%s/%s/%s" % (self.scheme, self._get_credstring(), 297 self.s3serviceurl, self.bucket, self.key) 298 299 def parse_uri(self, uri): 300 """Parse URLs. 301 302 Note that an Amazon AWS secret key can contain the forward slash, 303 which is entirely retarded, and breaks urlparse miserably. 304 This function works around that issue. 305 """ 306 # Make sure that URIs that contain multiple schemes, such as: 307 # s3://accesskey:secretkey@https://s3.amazonaws.com/bucket/key-id 308 # are immediately rejected. 309 if uri.count('://') != 1: 310 reason = ("URI cannot contain more than one occurrence " 311 "of a scheme. If you have specified a URI like " 312 "s3://accesskey:secretkey@" 313 "https://s3.amazonaws.com/bucket/key-id" 314 ", you need to change it to use the " 315 "s3+https:// scheme, like so: " 316 "s3+https://accesskey:secretkey@" 317 "s3.amazonaws.com/bucket/key-id") 318 LOG.info("Invalid store uri: %s", reason) 319 raise exceptions.BadStoreUri(uri=uri) 320 321 pieces = urllib.parse.urlparse(uri) 322 self.validate_schemas(uri, valid_schemas=( 323 's3://', 's3+http://', 's3+https://')) 324 self.scheme = pieces.scheme 325 path = pieces.path.strip('/') 326 netloc = pieces.netloc.strip('/') 327 entire_path = (netloc + '/' + path).strip('/') 328 329 if '@' in uri: 330 creds, path = entire_path.split('@') 331 cred_parts = creds.split(':') 332 333 try: 334 self.accesskey = cred_parts[0] 335 self.secretkey = cred_parts[1] 336 except IndexError: 337 LOG.error("Badly formed S3 credentials") 338 raise exceptions.BadStoreUri(uri=uri) 339 else: 340 self.accesskey = None 341 path = entire_path 342 try: 343 path_parts = path.split('/') 344 self.key = path_parts.pop() 345 self.bucket = path_parts.pop() 346 if path_parts: 347 self.s3serviceurl = '/'.join(path_parts).strip('/') 348 else: 349 LOG.error("Badly formed S3 URI. Missing s3 service URL.") 350 raise exceptions.BadStoreUri(uri=uri) 351 except IndexError: 352 LOG.error("Badly formed S3 URI") 353 raise exceptions.BadStoreUri(uri=uri) 354 355 356class Store(glance_store.driver.Store): 357 """An implementation of the s3 adapter.""" 358 359 _CAPABILITIES = capabilities.BitMasks.RW_ACCESS 360 OPTIONS = _S3_OPTS 361 EXAMPLE_URL = "s3://<ACCESS_KEY>:<SECRET_KEY>@<S3_URL>/<BUCKET>/<OBJ>" 362 363 READ_CHUNKSIZE = 64 * units.Ki 364 WRITE_CHUNKSIZE = 5 * units.Mi 365 366 @staticmethod 367 def get_schemes(): 368 return 's3', 's3+http', 's3+https' 369 370 def configure_add(self): 371 """ 372 Configure the Store to use the stored configuration options 373 Any store that needs special configuration should implement 374 this method. If the store was not able to successfully configure 375 itself, it should raise `exceptions.BadStoreConfiguration` 376 """ 377 self.s3_host = self._option_get('s3_store_host') 378 self.access_key = self._option_get('s3_store_access_key') 379 self.secret_key = self._option_get('s3_store_secret_key') 380 self.bucket = self._option_get('s3_store_bucket') 381 382 self.scheme = 's3' 383 if self.s3_host.startswith('https://'): 384 self.scheme = 's3+https' 385 self.full_s3_host = self.s3_host 386 elif self.s3_host.startswith('http://'): 387 self.full_s3_host = self.s3_host 388 else: # Defaults http 389 self.full_s3_host = 'http://' + self.s3_host 390 391 _s3_obj_size = self._option_get('s3_store_large_object_size') 392 self.s3_store_large_object_size = _s3_obj_size * units.Mi 393 _s3_ck_size = self._option_get('s3_store_large_object_chunk_size') 394 _s3_ck_min = DEFAULT_LARGE_OBJECT_MIN_CHUNK_SIZE 395 if _s3_ck_size < _s3_ck_min: 396 reason = _("s3_store_large_object_chunk_size must be at " 397 "least %d MB.") % _s3_ck_min 398 LOG.error(reason) 399 raise exceptions.BadStoreConfiguration(store_name="s3", 400 reason=reason) 401 self.s3_store_large_object_chunk_size = _s3_ck_size * units.Mi 402 403 self.s3_store_thread_pools = self._option_get('s3_store_thread_pools') 404 if self.s3_store_thread_pools <= 0: 405 reason = _("s3_store_thread_pools must be a positive " 406 "integer. %s") % self.s3_store_thread_pools 407 LOG.error(reason) 408 raise exceptions.BadStoreConfiguration(store_name="s3", 409 reason=reason) 410 411 if self.backend_group: 412 self._set_url_prefix() 413 414 def _set_url_prefix(self): 415 s3_host = self.s3_host 416 if s3_host.startswith('http://'): 417 s3_host = s3_host[len('http://'):] 418 elif s3_host.startswith('https://'): 419 s3_host = s3_host[len('https://'):] 420 421 self._url_prefix = "%s://%s:%s@%s/%s" % (self.scheme, self.access_key, 422 self.secret_key, s3_host, 423 self.bucket) 424 425 def _option_get(self, param): 426 if self.backend_group: 427 store_conf = getattr(self.conf, self.backend_group) 428 else: 429 store_conf = self.conf.glance_store 430 431 result = getattr(store_conf, param) 432 if not result: 433 if param == 's3_store_create_bucket_on_put': 434 return result 435 reason = _("Could not find %s in configuration options.") % param 436 LOG.error(reason) 437 raise exceptions.BadStoreConfiguration(store_name="s3", 438 reason=reason) 439 return result 440 441 def _create_s3_client(self, loc): 442 """Create a client object to use when connecting to S3. 443 444 :param loc: `glance_store.location.Location` object, supplied 445 from glance_store.location.get_location_from_uri() 446 :returns: An object with credentials to connect to S3 447 """ 448 s3_host = self._option_get('s3_store_host') 449 url_format = self._option_get('s3_store_bucket_url_format') 450 calling_format = {'addressing_style': url_format} 451 452 session = boto_session.Session(aws_access_key_id=loc.accesskey, 453 aws_secret_access_key=loc.secretkey) 454 config = boto_client.Config(s3=calling_format) 455 location = get_s3_location(s3_host) 456 457 bucket_name = loc.bucket 458 if (url_format == 'virtual' and 459 not boto_utils.check_dns_name(bucket_name)): 460 raise boto_exceptions.InvalidDNSNameError(bucket_name=bucket_name) 461 462 region_name, endpoint_url = None, None 463 if location: 464 region_name = location 465 else: 466 endpoint_url = s3_host 467 468 return session.client(service_name='s3', 469 endpoint_url=endpoint_url, 470 region_name=region_name, 471 use_ssl=(loc.scheme == 's3+https'), 472 config=config) 473 474 def _operation_set(self, loc): 475 """Objects and variables frequently used when operating S3 are 476 returned together. 477 478 :param loc: `glance_store.location.Location` object, supplied 479 from glance_store.location.get_location_from_uri() 480 "returns: tuple of: (1) S3 client object, (2) Bucket name, 481 (3) Image Object name 482 """ 483 return self._create_s3_client(loc), loc.bucket, loc.key 484 485 @capabilities.check 486 def get(self, location, offset=0, chunk_size=None, context=None): 487 """ 488 Takes a `glance_store.location.Location` object that indicates 489 where to find the image file, and returns a tuple of generator 490 (for reading the image file) and image_size 491 492 :param location: `glance_store.location.Location` object, supplied 493 from glance_store.location.get_location_from_uri() 494 :raises: `glance_store.exceptions.NotFound` if image does not exist 495 """ 496 loc = location.store_location 497 s3_client, bucket, key = self._operation_set(loc) 498 499 if not self._object_exists(s3_client, bucket, key): 500 LOG.warning("Could not find key %(key)s in " 501 "bucket %(bucket)s", {'key': key, 'bucket': bucket}) 502 raise exceptions.NotFound(image=key) 503 504 key = s3_client.get_object(Bucket=bucket, Key=key) 505 506 LOG.debug("Retrieved image object from S3 using s3_host=%(s3_host)s, " 507 "access_key=%(accesskey)s, bucket=%(bucket)s, " 508 "key=%(key)s)", 509 {'s3_host': loc.s3serviceurl, 'accesskey': loc.accesskey, 510 'bucket': bucket, 'key': key}) 511 512 cs = self.READ_CHUNKSIZE 513 514 class ResponseIndexable(glance_store.Indexable): 515 def another(self): 516 try: 517 return next(self.wrapped) 518 except StopIteration: 519 return b'' 520 521 return (ResponseIndexable(utils.chunkiter(key['Body'], cs), 522 key['ContentLength']), key['ContentLength']) 523 524 def get_size(self, location, context=None): 525 """ 526 Takes a `glance_store.location.Location` object that indicates 527 where to find the image file and returns the image size 528 529 :param location: `glance_store.location.Location` object, supplied 530 from glance_store.location.get_location_from_uri() 531 :raises: `glance_store.exceptions.NotFound` if image does not exist 532 :rtype: int 533 """ 534 loc = location.store_location 535 s3_client, bucket, key = self._operation_set(loc) 536 537 if not self._object_exists(s3_client, bucket, key): 538 LOG.warning("Could not find key %(key)s in " 539 "bucket %(bucket)s", {'key': key, 'bucket': bucket}) 540 raise exceptions.NotFound(image=key) 541 542 key = s3_client.head_object(Bucket=bucket, Key=key) 543 return key['ContentLength'] 544 545 @capabilities.check 546 def add(self, image_id, image_file, image_size, hashing_algo, context=None, 547 verifier=None): 548 """ 549 Stores an image file with supplied identifier to the backend 550 storage system and returns a tuple containing information 551 about the stored image. 552 553 :param image_id: The opaque image identifier 554 :param image_file: The image data to write, as a file-like object 555 :param image_size: The size of the image data to write, in bytes 556 :param hashing_algo: A hashlib algorithm identifier (string) 557 :param context: A context object 558 :param verifier: An object used to verify signatures for images 559 560 :returns: tuple of: (1) URL in backing store, (2) bytes written, 561 (3) checksum, (4) multihash value, and (5) a dictionary 562 with storage system specific information 563 :raises: `glance_store.exceptions.Duplicate` if the image already 564 exists 565 """ 566 loc = StoreLocation(store_specs={'scheme': self.scheme, 567 'bucket': self.bucket, 568 'key': image_id, 569 's3serviceurl': self.full_s3_host, 570 'accesskey': self.access_key, 571 'secretkey': self.secret_key}, 572 conf=self.conf, 573 backend_group=self.backend_group) 574 575 s3_client, bucket, key = self._operation_set(loc) 576 577 if not self._bucket_exists(s3_client, bucket): 578 if self._option_get('s3_store_create_bucket_on_put'): 579 self._create_bucket(s3_client, 580 self._option_get('s3_store_host'), 581 bucket) 582 else: 583 msg = (_("The bucket %s does not exist in " 584 "S3. Please set the " 585 "s3_store_create_bucket_on_put option " 586 "to add bucket to S3 automatically.") % bucket) 587 raise glance_store.BackendException(msg) 588 589 LOG.debug("Adding image object to S3 using (s3_host=%(s3_host)s, " 590 "access_key=%(access_key)s, bucket=%(bucket)s, " 591 "key=%(key)s)", 592 {'s3_host': self.s3_host, 'access_key': loc.accesskey, 593 'bucket': bucket, 'key': key}) 594 595 if not self._object_exists(s3_client, bucket, key): 596 if image_size < self.s3_store_large_object_size: 597 return self._add_singlepart(s3_client=s3_client, 598 image_file=image_file, 599 bucket=bucket, 600 key=key, 601 loc=loc, 602 hashing_algo=hashing_algo, 603 verifier=verifier) 604 605 return self._add_multipart(s3_client=s3_client, 606 image_file=image_file, 607 image_size=image_size, 608 bucket=bucket, 609 key=key, 610 loc=loc, 611 hashing_algo=hashing_algo, 612 verifier=verifier) 613 LOG.warning("S3 already has an image with bucket ID %(bucket)s, " 614 "key %(key)s", {'bucket': bucket, 'key': key}) 615 raise exceptions.Duplicate(image=key) 616 617 def _add_singlepart(self, s3_client, image_file, bucket, key, loc, 618 hashing_algo, verifier): 619 """Stores an image file with a single part upload to S3 backend. 620 621 :param s3_client: An object with credentials to connect to S3 622 :param image_file: The image data to write, as a file-like object 623 :param bucket: S3 bucket name 624 :param key: The object name to be stored (image identifier) 625 :param loc: `glance_store.location.Location` object, supplied 626 from glance_store.location.get_location_from_uri() 627 :param hashing_algo: A hashlib algorithm identifier (string) 628 :param verifier: An object used to verify signatures for images 629 :returns: tuple of: (1) URL in backing store, (2) bytes written, 630 (3) checksum, (4) multihash value, and (5) a dictionary 631 with storage system specific information 632 """ 633 os_hash_value = utils.get_hasher(hashing_algo, False) 634 checksum = utils.get_hasher('md5', False) 635 image_data = b'' 636 image_size = 0 637 for chunk in utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE): 638 image_data += chunk 639 image_size += len(chunk) 640 os_hash_value.update(chunk) 641 checksum.update(chunk) 642 if verifier: 643 verifier.update(chunk) 644 645 s3_client.put_object(Body=image_data, 646 Bucket=bucket, 647 Key=key) 648 hash_hex = os_hash_value.hexdigest() 649 checksum_hex = checksum.hexdigest() 650 651 # Add store backend information to location metadata 652 metadata = {} 653 if self.backend_group: 654 metadata['store'] = self.backend_group 655 656 LOG.debug("Wrote %(size)d bytes to S3 key named %(key)s " 657 "with checksum %(checksum)s", 658 {'size': image_size, 'key': key, 'checksum': checksum_hex}) 659 660 return loc.get_uri(), image_size, checksum_hex, hash_hex, metadata 661 662 def _add_multipart(self, s3_client, image_file, image_size, bucket, 663 key, loc, hashing_algo, verifier): 664 """Stores an image file with a multi part upload to S3 backend. 665 666 :param s3_client: An object with credentials to connect to S3 667 :param image_file: The image data to write, as a file-like object 668 :param bucket: S3 bucket name 669 :param key: The object name to be stored (image identifier) 670 :param loc: `glance_store.location.Location` object, supplied 671 from glance_store.location.get_location_from_uri() 672 :param hashing_algo: A hashlib algorithm identifier (string) 673 :param verifier: An object used to verify signatures for images 674 :returns: tuple of: (1) URL in backing store, (2) bytes written, 675 (3) checksum, (4) multihash value, and (5) a dictionary 676 with storage system specific information 677 """ 678 os_hash_value = utils.get_hasher(hashing_algo, False) 679 checksum = utils.get_hasher('md5', False) 680 pool_size = self.s3_store_thread_pools 681 pool = eventlet.greenpool.GreenPool(size=pool_size) 682 mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key) 683 upload_id = mpu['UploadId'] 684 LOG.debug("Multipart initiate key=%(key)s, UploadId=%(UploadId)s", 685 {'key': key, 'UploadId': upload_id}) 686 cstart = 0 687 plist = [] 688 689 chunk_size = int(math.ceil(float(image_size) / MAX_PART_NUM)) 690 write_chunk_size = max(self.s3_store_large_object_chunk_size, 691 chunk_size) 692 it = utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE) 693 buffered_chunk = b'' 694 while True: 695 try: 696 buffered_clen = len(buffered_chunk) 697 if buffered_clen < write_chunk_size: 698 # keep reading data 699 read_chunk = next(it) 700 buffered_chunk += read_chunk 701 continue 702 else: 703 write_chunk = buffered_chunk[:write_chunk_size] 704 remained_data = buffered_chunk[write_chunk_size:] 705 os_hash_value.update(write_chunk) 706 checksum.update(write_chunk) 707 if verifier: 708 verifier.update(write_chunk) 709 fp = six.BytesIO(write_chunk) 710 fp.seek(0) 711 part = UploadPart(mpu, fp, cstart + 1, len(write_chunk)) 712 pool.spawn_n(run_upload, s3_client, bucket, key, part) 713 plist.append(part) 714 cstart += 1 715 buffered_chunk = remained_data 716 except StopIteration: 717 if len(buffered_chunk) > 0: 718 # Write the last chunk data 719 write_chunk = buffered_chunk 720 os_hash_value.update(write_chunk) 721 checksum.update(write_chunk) 722 if verifier: 723 verifier.update(write_chunk) 724 fp = six.BytesIO(write_chunk) 725 fp.seek(0) 726 part = UploadPart(mpu, fp, cstart + 1, len(write_chunk)) 727 pool.spawn_n(run_upload, s3_client, bucket, key, part) 728 plist.append(part) 729 break 730 731 pedict = {} 732 total_size = 0 733 pool.waitall() 734 735 for part in plist: 736 pedict.update(part.etag) 737 total_size += part.size 738 739 success = True 740 for part in plist: 741 if not part.success: 742 success = False 743 744 if success: 745 # Complete 746 mpu_list = self._get_mpu_list(pedict) 747 s3_client.complete_multipart_upload(Bucket=bucket, 748 Key=key, 749 MultipartUpload=mpu_list, 750 UploadId=upload_id) 751 hash_hex = os_hash_value.hexdigest() 752 checksum_hex = checksum.hexdigest() 753 754 # Add store backend information to location metadata 755 metadata = {} 756 if self.backend_group: 757 metadata['store'] = self.backend_group 758 759 LOG.info("Multipart complete key=%(key)s " 760 "UploadId=%(UploadId)s " 761 "Wrote %(total_size)d bytes to S3 key " 762 "named %(key)s " 763 "with checksum %(checksum)s", 764 {'key': key, 'UploadId': upload_id, 765 'total_size': total_size, 'checksum': checksum_hex}) 766 return loc.get_uri(), total_size, checksum_hex, hash_hex, metadata 767 768 # Abort 769 s3_client.abort_multipart_upload(Bucket=bucket, Key=key, 770 UploadId=upload_id) 771 LOG.error("Some parts failed to upload to S3. " 772 "Aborted the key=%s", key) 773 msg = _("Failed to add image object to S3. key=%s") % key 774 raise glance_store.BackendException(msg) 775 776 @capabilities.check 777 def delete(self, location, context=None): 778 """ 779 Takes a `glance_store.location.Location` object that indicates 780 where to find the image file to delete. 781 782 :param location: `glance_store.location.Location` object, supplied 783 from glance_store.location.get_location_from_uri() 784 785 :raises: NotFound if image does not exist; 786 InUseByStore if image is in use or snapshot unprotect failed 787 """ 788 loc = location.store_location 789 s3_client, bucket, key = self._operation_set(loc) 790 791 if not self._object_exists(s3_client, bucket, key): 792 LOG.warning("Could not find key %(key)s in bucket %(bucket)s", 793 {'key': key, 'bucket': bucket}) 794 raise exceptions.NotFound(image=key) 795 796 LOG.debug("Deleting image object from S3 using s3_host=%(s3_host)s, " 797 "accesskey=%(accesskey)s, bucket=%(bucket)s, key=%(key)s)", 798 {'s3_host': loc.s3serviceurl, 'accesskey': loc.accesskey, 799 'bucket': bucket, 'key': key}) 800 801 return s3_client.delete_object(Bucket=bucket, Key=key) 802 803 @staticmethod 804 def _bucket_exists(s3_client, bucket): 805 """Check whether bucket exists in the S3. 806 807 :param s3_client: An object with credentials to connect to S3 808 :param bucket: S3 bucket name 809 :returns: boolean value; If the value is true, the bucket is exist 810 if false, it is not. 811 :raises: BadStoreConfiguration if cannot connect to S3 successfully 812 """ 813 try: 814 s3_client.head_bucket(Bucket=bucket) 815 except boto_exceptions.ClientError as e: 816 error_code = e.response['Error']['Code'] 817 if error_code == '404': 818 return False 819 msg = ("Failed to get bucket info: %s" % 820 encodeutils.exception_to_unicode(e)) 821 LOG.error(msg) 822 raise glance_store.BadStoreConfiguration(store_name='s3', 823 reason=msg) 824 else: 825 return True 826 827 @staticmethod 828 def _object_exists(s3_client, bucket, key): 829 """Check whether object exists in the specific bucket of S3. 830 831 :param s3_client: An object with credentials to connect to S3 832 :param bucket: S3 bucket name 833 :param key: The image object name 834 :returns: boolean value; If the value is true, the object is exist 835 if false, it is not. 836 :raises: BadStoreConfiguration if cannot connect to S3 successfully 837 """ 838 try: 839 s3_client.head_object(Bucket=bucket, Key=key) 840 except boto_exceptions.ClientError as e: 841 error_code = e.response['Error']['Code'] 842 if error_code == '404': 843 return False 844 msg = ("Failed to get object info: %s" % 845 encodeutils.exception_to_unicode(e)) 846 LOG.error(msg) 847 raise glance_store.BadStoreConfiguration(store_name='s3', 848 reason=msg) 849 else: 850 return True 851 852 @staticmethod 853 def _create_bucket(s3_client, s3_host, bucket): 854 """Create bucket into the S3. 855 856 :param s3_client: An object with credentials to connect to S3 857 :param s3_host: S3 endpoint url 858 :param bucket: S3 bucket name 859 :raises: BadStoreConfiguration if cannot connect to S3 successfully 860 """ 861 region = get_s3_location(s3_host) 862 try: 863 s3_client.create_bucket( 864 Bucket=bucket, 865 ) if region == '' else s3_client.create_bucket( 866 Bucket=bucket, 867 CreateBucketConfiguration={ 868 'LocationConstraint': region 869 } 870 ) 871 except boto_exceptions.ClientError as e: 872 msg = ("Failed to add bucket to S3: %s" % 873 encodeutils.exception_to_unicode(e)) 874 LOG.error(msg) 875 raise glance_store.BadStoreConfiguration(store_name='s3', 876 reason=msg) 877 878 @staticmethod 879 def _get_mpu_list(pedict): 880 """Convert an object type and struct for use in 881 boto3.client('s3').complete_multipart_upload. 882 883 :param pedict: dict which containing UploadPart.etag 884 :returns: list with pedict converted properly 885 """ 886 return { 887 'Parts': [ 888 { 889 'PartNumber': pnum, 890 'ETag': etag 891 } for pnum, etag in six.iteritems(pedict) 892 ] 893 } 894 895 896def get_s3_location(s3_host): 897 """Get S3 region information from ``s3_store_host``. 898 899 :param s3_host: S3 endpoint url 900 :returns: string value; region information which user wants to use on 901 Amazon S3, and if user wants to use S3 compatible storage, 902 returns '' 903 """ 904 locations = { 905 's3.amazonaws.com': '', 906 's3-us-east-1.amazonaws.com': 'us-east-1', 907 's3-us-east-2.amazonaws.com': 'us-east-2', 908 's3-us-west-1.amazonaws.com': 'us-west-1', 909 's3-us-west-2.amazonaws.com': 'us-west-2', 910 's3-ap-east-1.amazonaws.com': 'ap-east-1', 911 's3-ap-south-1.amazonaws.com': 'ap-south-1', 912 's3-ap-northeast-1.amazonaws.com': 'ap-northeast-1', 913 's3-ap-northeast-2.amazonaws.com': 'ap-northeast-2', 914 's3-ap-northeast-3.amazonaws.com': 'ap-northeast-3', 915 's3-ap-southeast-1.amazonaws.com': 'ap-southeast-1', 916 's3-ap-southeast-2.amazonaws.com': 'ap-southeast-2', 917 's3-ca-central-1.amazonaws.com': 'ca-central-1', 918 's3-cn-north-1.amazonaws.com.cn': 'cn-north-1', 919 's3-cn-northwest-1.amazonaws.com.cn': 'cn-northwest-1', 920 's3-eu-central-1.amazonaws.com': 'eu-central-1', 921 's3-eu-west-1.amazonaws.com': 'eu-west-1', 922 's3-eu-west-2.amazonaws.com': 'eu-west-2', 923 's3-eu-west-3.amazonaws.com': 'eu-west-3', 924 's3-eu-north-1.amazonaws.com': 'eu-north-1', 925 's3-sa-east-1.amazonaws.com': 'sa-east-1' 926 } 927 # strip off scheme and port if present 928 key = re.sub(r'^(https?://)?(?P<host>[^:]+[^/])(:[0-9]+)?/?$', 929 r'\g<host>', 930 s3_host) 931 return locations.get(key, '') 932