1# Licensed under the Apache License, Version 2.0 (the "License"); 2# you may not use this file except in compliance with the License. 3# You may obtain a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, 9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10# See the License for the specific language governing permissions and 11# limitations under the License. 12 13# import types so that we can reference ListType in sphinx param declarations. 14# We can't just use list, because sphinx gets confused by 15# openstack.resource.Resource.list and openstack.resource2.Resource.list 16import collections 17import concurrent.futures 18import hashlib 19import json 20import os 21import types # noqa 22import urllib.parse 23 24import keystoneauth1.exceptions 25 26from openstack.cloud import _normalize 27from openstack.cloud import _utils 28from openstack.cloud import exc 29from openstack import exceptions 30from openstack import proxy 31from openstack import utils 32 33 34DEFAULT_OBJECT_SEGMENT_SIZE = 1073741824 # 1GB 35# This halves the current default for Swift 36DEFAULT_MAX_FILE_SIZE = (5 * 1024 * 1024 * 1024 + 2) / 2 37 38 39OBJECT_CONTAINER_ACLS = { 40 'public': '.r:*,.rlistings', 41 'private': '', 42} 43 44 45class ObjectStoreCloudMixin(_normalize.Normalizer): 46 47 @property 48 def _object_store_client(self): 49 if 'object-store' not in self._raw_clients: 50 raw_client = self._get_raw_client('object-store') 51 self._raw_clients['object-store'] = raw_client 52 return self._raw_clients['object-store'] 53 54 def list_containers(self, full_listing=True, prefix=None): 55 """List containers. 56 57 :param full_listing: Ignored. Present for backwards compat 58 59 :returns: list of Munch of the container objects 60 61 :raises: OpenStackCloudException on operation error. 62 """ 63 params = dict(format='json', prefix=prefix) 64 response = self.object_store.get('/', params=params) 65 return self._get_and_munchify(None, proxy._json_response(response)) 66 67 def search_containers(self, name=None, filters=None): 68 """Search containers. 69 70 :param string name: container name. 71 :param filters: a dict containing additional filters to use. 72 OR 73 A string containing a jmespath expression for further filtering. 74 Example:: "[?last_name==`Smith`] | [?other.gender]==`Female`]" 75 76 :returns: a list of ``munch.Munch`` containing the containers. 77 78 :raises: ``OpenStackCloudException``: if something goes wrong during 79 the OpenStack API call. 80 """ 81 containers = self.list_containers() 82 return _utils._filter_list(containers, name, filters) 83 84 def get_container(self, name, skip_cache=False): 85 """Get metadata about a container. 86 87 :param str name: 88 Name of the container to get metadata for. 89 :param bool skip_cache: 90 Ignore the cache of container metadata for this container.o 91 Defaults to ``False``. 92 """ 93 if skip_cache or name not in self._container_cache: 94 try: 95 response = self.object_store.head( 96 self._get_object_endpoint(name) 97 ) 98 exceptions.raise_from_response(response) 99 self._container_cache[name] = response.headers 100 except exc.OpenStackCloudHTTPError as e: 101 if e.response.status_code == 404: 102 return None 103 raise 104 return self._container_cache[name] 105 106 def create_container(self, name, public=False): 107 """Create an object-store container. 108 109 :param str name: 110 Name of the container to create. 111 :param bool public: 112 Whether to set this container to be public. Defaults to ``False``. 113 """ 114 container = self.get_container(name) 115 if container: 116 return container 117 exceptions.raise_from_response(self.object_store.put( 118 self._get_object_endpoint(name) 119 )) 120 if public: 121 self.set_container_access(name, 'public') 122 return self.get_container(name, skip_cache=True) 123 124 def delete_container(self, name): 125 """Delete an object-store container. 126 127 :param str name: Name of the container to delete. 128 """ 129 try: 130 exceptions.raise_from_response(self.object_store.delete( 131 self._get_object_endpoint(name) 132 )) 133 self._container_cache.pop(name, None) 134 return True 135 except exc.OpenStackCloudHTTPError as e: 136 if e.response.status_code == 404: 137 return False 138 if e.response.status_code == 409: 139 raise exc.OpenStackCloudException( 140 'Attempt to delete container {container} failed. The' 141 ' container is not empty. Please delete the objects' 142 ' inside it before deleting the container'.format( 143 container=name)) 144 raise 145 146 def update_container(self, name, headers): 147 """Update the metadata in a container. 148 149 :param str name: 150 Name of the container to create. 151 :param dict headers: 152 Key/Value headers to set on the container. 153 """ 154 """Update the metadata in a container. 155 156 :param str name: 157 Name of the container to update. 158 :param dict headers: 159 Key/Value headers to set on the container. 160 """ 161 exceptions.raise_from_response( 162 self.object_store.post( 163 self._get_object_endpoint(name), headers=headers) 164 ) 165 166 def set_container_access(self, name, access): 167 """Set the access control list on a container. 168 169 :param str name: 170 Name of the container. 171 :param str access: 172 ACL string to set on the container. Can also be ``public`` 173 or ``private`` which will be translated into appropriate ACL 174 strings. 175 """ 176 if access not in OBJECT_CONTAINER_ACLS: 177 raise exc.OpenStackCloudException( 178 "Invalid container access specified: %s. Must be one of %s" 179 % (access, list(OBJECT_CONTAINER_ACLS.keys()))) 180 header = {'x-container-read': OBJECT_CONTAINER_ACLS[access]} 181 self.update_container(name, header) 182 183 def get_container_access(self, name): 184 """Get the control list from a container. 185 186 :param str name: Name of the container. 187 """ 188 container = self.get_container(name, skip_cache=True) 189 if not container: 190 raise exc.OpenStackCloudException("Container not found: %s" % name) 191 acl = container.get('x-container-read', '') 192 for key, value in OBJECT_CONTAINER_ACLS.items(): 193 # Convert to string for the comparison because swiftclient 194 # returns byte values as bytes sometimes and apparently == 195 # on bytes doesn't work like you'd think 196 if str(acl) == str(value): 197 return key 198 raise exc.OpenStackCloudException( 199 "Could not determine container access for ACL: %s." % acl) 200 201 def _get_file_hashes(self, filename): 202 file_key = "{filename}:{mtime}".format( 203 filename=filename, 204 mtime=os.stat(filename).st_mtime) 205 if file_key not in self._file_hash_cache: 206 self.log.debug( 207 'Calculating hashes for %(filename)s', {'filename': filename}) 208 (md5, sha256) = (None, None) 209 with open(filename, 'rb') as file_obj: 210 (md5, sha256) = self._calculate_data_hashes(file_obj) 211 self._file_hash_cache[file_key] = dict( 212 md5=md5, sha256=sha256) 213 self.log.debug( 214 "Image file %(filename)s md5:%(md5)s sha256:%(sha256)s", 215 {'filename': filename, 216 'md5': self._file_hash_cache[file_key]['md5'], 217 'sha256': self._file_hash_cache[file_key]['sha256']}) 218 return (self._file_hash_cache[file_key]['md5'], 219 self._file_hash_cache[file_key]['sha256']) 220 221 def _calculate_data_hashes(self, data): 222 md5 = utils.md5(usedforsecurity=False) 223 sha256 = hashlib.sha256() 224 225 if hasattr(data, 'read'): 226 for chunk in iter(lambda: data.read(8192), b''): 227 md5.update(chunk) 228 sha256.update(chunk) 229 else: 230 md5.update(data) 231 sha256.update(data) 232 return (md5.hexdigest(), sha256.hexdigest()) 233 234 @_utils.cache_on_arguments() 235 def get_object_capabilities(self): 236 """Get infomation about the object-storage service 237 238 The object-storage service publishes a set of capabilities that 239 include metadata about maximum values and thresholds. 240 """ 241 # The endpoint in the catalog has version and project-id in it 242 # To get capabilities, we have to disassemble and reassemble the URL 243 # This logic is taken from swiftclient 244 endpoint = urllib.parse.urlparse(self.object_store.get_endpoint()) 245 url = "{scheme}://{netloc}/info".format( 246 scheme=endpoint.scheme, netloc=endpoint.netloc) 247 248 return proxy._json_response(self.object_store.get(url)) 249 250 def get_object_segment_size(self, segment_size): 251 """Get a segment size that will work given capabilities""" 252 if segment_size is None: 253 segment_size = DEFAULT_OBJECT_SEGMENT_SIZE 254 min_segment_size = 0 255 try: 256 caps = self.get_object_capabilities() 257 except exc.OpenStackCloudHTTPError as e: 258 if e.response.status_code in (404, 412): 259 server_max_file_size = DEFAULT_MAX_FILE_SIZE 260 self.log.info( 261 "Swift capabilities not supported. " 262 "Using default max file size.") 263 else: 264 raise 265 else: 266 server_max_file_size = caps.get('swift', {}).get('max_file_size', 267 0) 268 min_segment_size = caps.get('slo', {}).get('min_segment_size', 0) 269 270 if segment_size > server_max_file_size: 271 return server_max_file_size 272 if segment_size < min_segment_size: 273 return min_segment_size 274 return segment_size 275 276 def is_object_stale( 277 self, container, name, filename, file_md5=None, file_sha256=None): 278 """Check to see if an object matches the hashes of a file. 279 280 :param container: Name of the container. 281 :param name: Name of the object. 282 :param filename: Path to the file. 283 :param file_md5: 284 Pre-calculated md5 of the file contents. Defaults to None which 285 means calculate locally. 286 :param file_sha256: 287 Pre-calculated sha256 of the file contents. Defaults to None which 288 means calculate locally. 289 """ 290 metadata = self.get_object_metadata(container, name) 291 if not metadata: 292 self.log.debug( 293 "swift stale check, no object: {container}/{name}".format( 294 container=container, name=name)) 295 return True 296 297 if not (file_md5 or file_sha256): 298 (file_md5, file_sha256) = self._get_file_hashes(filename) 299 md5_key = metadata.get( 300 self._OBJECT_MD5_KEY, metadata.get(self._SHADE_OBJECT_MD5_KEY, '')) 301 sha256_key = metadata.get( 302 self._OBJECT_SHA256_KEY, metadata.get( 303 self._SHADE_OBJECT_SHA256_KEY, '')) 304 up_to_date = self._hashes_up_to_date( 305 md5=file_md5, sha256=file_sha256, 306 md5_key=md5_key, sha256_key=sha256_key) 307 308 if not up_to_date: 309 self.log.debug( 310 "swift checksum mismatch: " 311 " %(filename)s!=%(container)s/%(name)s", 312 {'filename': filename, 'container': container, 'name': name}) 313 return True 314 315 self.log.debug( 316 "swift object up to date: %(container)s/%(name)s", 317 {'container': container, 'name': name}) 318 return False 319 320 def create_directory_marker_object(self, container, name, **headers): 321 """Create a zero-byte directory marker object 322 323 .. note:: 324 325 This method is not needed in most cases. Modern swift does not 326 require directory marker objects. However, some swift installs may 327 need these. 328 329 When using swift Static Web and Web Listings to serve static content 330 one may need to create a zero-byte object to represent each 331 "directory". Doing so allows Web Listings to generate an index of the 332 objects inside of it, and allows Static Web to render index.html 333 "files" that are "inside" the directory. 334 335 :param container: The name of the container. 336 :param name: Name for the directory marker object within the container. 337 :param headers: These will be passed through to the object creation 338 API as HTTP Headers. 339 """ 340 headers['content-type'] = 'application/directory' 341 342 return self.create_object( 343 container, 344 name, 345 data='', 346 generate_checksums=False, 347 **headers) 348 349 def create_object( 350 self, container, name, filename=None, 351 md5=None, sha256=None, segment_size=None, 352 use_slo=True, metadata=None, 353 generate_checksums=None, data=None, 354 **headers): 355 """Create a file object. 356 357 Automatically uses large-object segments if needed. 358 359 :param container: The name of the container to store the file in. 360 This container will be created if it does not exist already. 361 :param name: Name for the object within the container. 362 :param filename: The path to the local file whose contents will be 363 uploaded. Mutually exclusive with data. 364 :param data: The content to upload to the object. Mutually exclusive 365 with filename. 366 :param md5: A hexadecimal md5 of the file. (Optional), if it is known 367 and can be passed here, it will save repeating the expensive md5 368 process. It is assumed to be accurate. 369 :param sha256: A hexadecimal sha256 of the file. (Optional) See md5. 370 :param segment_size: Break the uploaded object into segments of this 371 many bytes. (Optional) Shade will attempt to discover the maximum 372 value for this from the server if it is not specified, or will use 373 a reasonable default. 374 :param headers: These will be passed through to the object creation 375 API as HTTP Headers. 376 :param use_slo: If the object is large enough to need to be a Large 377 Object, use a static rather than dynamic object. Static Objects 378 will delete segment objects when the manifest object is deleted. 379 (optional, defaults to True) 380 :param generate_checksums: Whether to generate checksums on the client 381 side that get added to headers for later prevention of double 382 uploads of identical data. (optional, defaults to True) 383 :param metadata: This dict will get changed into headers that set 384 metadata of the object 385 386 :raises: ``OpenStackCloudException`` on operation error. 387 """ 388 if data is not None and filename: 389 raise ValueError( 390 "Both filename and data given. Please choose one.") 391 if data is not None and not name: 392 raise ValueError( 393 "name is a required parameter when data is given") 394 if data is not None and generate_checksums: 395 raise ValueError( 396 "checksums cannot be generated with data parameter") 397 if generate_checksums is None: 398 if data is not None: 399 generate_checksums = False 400 else: 401 generate_checksums = True 402 403 if not metadata: 404 metadata = {} 405 406 if not filename and data is None: 407 filename = name 408 409 if generate_checksums and (md5 is None or sha256 is None): 410 (md5, sha256) = self._get_file_hashes(filename) 411 if md5: 412 headers[self._OBJECT_MD5_KEY] = md5 or '' 413 if sha256: 414 headers[self._OBJECT_SHA256_KEY] = sha256 or '' 415 for (k, v) in metadata.items(): 416 if not k.lower().startswith('x-object-meta-'): 417 headers['x-object-meta-' + k] = v 418 else: 419 headers[k] = v 420 421 endpoint = self._get_object_endpoint(container, name) 422 423 if data is not None: 424 self.log.debug( 425 "swift uploading data to %(endpoint)s", 426 {'endpoint': endpoint}) 427 428 return self._upload_object_data(endpoint, data, headers) 429 430 # segment_size gets used as a step value in a range call, so needs 431 # to be an int 432 if segment_size: 433 segment_size = int(segment_size) 434 segment_size = self.get_object_segment_size(segment_size) 435 file_size = os.path.getsize(filename) 436 437 if self.is_object_stale(container, name, filename, md5, sha256): 438 439 self.log.debug( 440 "swift uploading %(filename)s to %(endpoint)s", 441 {'filename': filename, 'endpoint': endpoint}) 442 443 if file_size <= segment_size: 444 self._upload_object(endpoint, filename, headers) 445 else: 446 self._upload_large_object( 447 endpoint, filename, headers, 448 file_size, segment_size, use_slo) 449 450 def _upload_object_data(self, endpoint, data, headers): 451 return proxy._json_response(self.object_store.put( 452 endpoint, headers=headers, data=data)) 453 454 def _upload_object(self, endpoint, filename, headers): 455 return proxy._json_response(self.object_store.put( 456 endpoint, headers=headers, data=open(filename, 'rb'))) 457 458 def _get_file_segments(self, endpoint, filename, file_size, segment_size): 459 # Use an ordered dict here so that testing can replicate things 460 segments = collections.OrderedDict() 461 for (index, offset) in enumerate(range(0, file_size, segment_size)): 462 remaining = file_size - (index * segment_size) 463 segment = _utils.FileSegment( 464 filename, offset, 465 segment_size if segment_size < remaining else remaining) 466 name = '{endpoint}/{index:0>6}'.format( 467 endpoint=endpoint, index=index) 468 segments[name] = segment 469 return segments 470 471 def _object_name_from_url(self, url): 472 '''Get container_name/object_name from the full URL called. 473 474 Remove the Swift endpoint from the front of the URL, and remove 475 the leaving / that will leave behind.''' 476 endpoint = self.object_store.get_endpoint() 477 object_name = url.replace(endpoint, '') 478 if object_name.startswith('/'): 479 object_name = object_name[1:] 480 return object_name 481 482 def _add_etag_to_manifest(self, segment_results, manifest): 483 for result in segment_results: 484 if 'Etag' not in result.headers: 485 continue 486 name = self._object_name_from_url(result.url) 487 for entry in manifest: 488 if entry['path'] == '/{name}'.format(name=name): 489 entry['etag'] = result.headers['Etag'] 490 491 def _upload_large_object( 492 self, endpoint, filename, 493 headers, file_size, segment_size, use_slo): 494 # If the object is big, we need to break it up into segments that 495 # are no larger than segment_size, upload each of them individually 496 # and then upload a manifest object. The segments can be uploaded in 497 # parallel, so we'll use the async feature of the TaskManager. 498 499 segment_futures = [] 500 segment_results = [] 501 retry_results = [] 502 retry_futures = [] 503 manifest = [] 504 505 # Get an OrderedDict with keys being the swift location for the 506 # segment, the value a FileSegment file-like object that is a 507 # slice of the data for the segment. 508 segments = self._get_file_segments( 509 endpoint, filename, file_size, segment_size) 510 511 # Schedule the segments for upload 512 for name, segment in segments.items(): 513 # Async call to put - schedules execution and returns a future 514 segment_future = self._pool_executor.submit( 515 self.object_store.put, 516 name, headers=headers, data=segment, 517 raise_exc=False) 518 segment_futures.append(segment_future) 519 # TODO(mordred) Collect etags from results to add to this manifest 520 # dict. Then sort the list of dicts by path. 521 manifest.append(dict( 522 path='/{name}'.format(name=name), 523 size_bytes=segment.length)) 524 525 # Try once and collect failed results to retry 526 segment_results, retry_results = self._wait_for_futures( 527 segment_futures, raise_on_error=False) 528 529 self._add_etag_to_manifest(segment_results, manifest) 530 531 for result in retry_results: 532 # Grab the FileSegment for the failed upload so we can retry 533 name = self._object_name_from_url(result.url) 534 segment = segments[name] 535 segment.seek(0) 536 # Async call to put - schedules execution and returns a future 537 segment_future = self._pool_executor.submit( 538 self.object_store.put, 539 name, headers=headers, data=segment) 540 # TODO(mordred) Collect etags from results to add to this manifest 541 # dict. Then sort the list of dicts by path. 542 retry_futures.append(segment_future) 543 544 # If any segments fail the second time, just throw the error 545 segment_results, retry_results = self._wait_for_futures( 546 retry_futures, raise_on_error=True) 547 548 self._add_etag_to_manifest(segment_results, manifest) 549 550 # If the final manifest upload fails, remove the segments we've 551 # already uploaded. 552 try: 553 if use_slo: 554 return self._finish_large_object_slo(endpoint, headers, 555 manifest) 556 else: 557 return self._finish_large_object_dlo(endpoint, headers) 558 except Exception: 559 try: 560 segment_prefix = endpoint.split('/')[-1] 561 self.log.debug( 562 "Failed to upload large object manifest for %s. " 563 "Removing segment uploads.", segment_prefix) 564 self.delete_autocreated_image_objects( 565 segment_prefix=segment_prefix) 566 except Exception: 567 self.log.exception( 568 "Failed to cleanup image objects for %s:", 569 segment_prefix) 570 raise 571 572 def _finish_large_object_slo(self, endpoint, headers, manifest): 573 # TODO(mordred) send an etag of the manifest, which is the md5sum 574 # of the concatenation of the etags of the results 575 headers = headers.copy() 576 retries = 3 577 while True: 578 try: 579 return self._object_store_client.put( 580 endpoint, 581 params={'multipart-manifest': 'put'}, 582 headers=headers, data=json.dumps(manifest)) 583 except Exception: 584 retries -= 1 585 if retries == 0: 586 raise 587 588 def _finish_large_object_dlo(self, endpoint, headers): 589 headers = headers.copy() 590 headers['X-Object-Manifest'] = endpoint 591 retries = 3 592 while True: 593 try: 594 return self._object_store_client.put(endpoint, headers=headers) 595 except Exception: 596 retries -= 1 597 if retries == 0: 598 raise 599 600 def update_object(self, container, name, metadata=None, **headers): 601 """Update the metadata of an object 602 603 :param container: The name of the container the object is in 604 :param name: Name for the object within the container. 605 :param metadata: This dict will get changed into headers that set 606 metadata of the object 607 :param headers: These will be passed through to the object update 608 API as HTTP Headers. 609 610 :raises: ``OpenStackCloudException`` on operation error. 611 """ 612 if not metadata: 613 metadata = {} 614 615 metadata_headers = {} 616 617 for (k, v) in metadata.items(): 618 metadata_headers['x-object-meta-' + k] = v 619 620 headers = dict(headers, **metadata_headers) 621 622 return self._object_store_client.post( 623 self._get_object_endpoint(container, name), 624 headers=headers) 625 626 def list_objects(self, container, full_listing=True, prefix=None): 627 """List objects. 628 629 :param container: Name of the container to list objects in. 630 :param full_listing: Ignored. Present for backwards compat 631 :param string prefix: 632 only objects with this prefix will be returned. 633 (optional) 634 635 :returns: list of Munch of the objects 636 637 :raises: OpenStackCloudException on operation error. 638 """ 639 params = dict(format='json', prefix=prefix) 640 data = self._object_store_client.get(container, params=params) 641 return self._get_and_munchify(None, data) 642 643 def search_objects(self, container, name=None, filters=None): 644 """Search objects. 645 646 :param string name: object name. 647 :param filters: a dict containing additional filters to use. 648 OR 649 A string containing a jmespath expression for further filtering. 650 Example:: "[?last_name==`Smith`] | [?other.gender]==`Female`]" 651 652 :returns: a list of ``munch.Munch`` containing the objects. 653 654 :raises: ``OpenStackCloudException``: if something goes wrong during 655 the OpenStack API call. 656 """ 657 objects = self.list_objects(container) 658 return _utils._filter_list(objects, name, filters) 659 660 def delete_object(self, container, name, meta=None): 661 """Delete an object from a container. 662 663 :param string container: Name of the container holding the object. 664 :param string name: Name of the object to delete. 665 :param dict meta: Metadata for the object in question. (optional, will 666 be fetched if not provided) 667 668 :returns: True if delete succeeded, False if the object was not found. 669 670 :raises: OpenStackCloudException on operation error. 671 """ 672 # TODO(mordred) DELETE for swift returns status in text/plain format 673 # like so: 674 # Number Deleted: 15 675 # Number Not Found: 0 676 # Response Body: 677 # Response Status: 200 OK 678 # Errors: 679 # We should ultimately do something with that 680 try: 681 if not meta: 682 meta = self.get_object_metadata(container, name) 683 if not meta: 684 return False 685 params = {} 686 if meta.get('X-Static-Large-Object', None) == 'True': 687 params['multipart-manifest'] = 'delete' 688 self._object_store_client.delete( 689 self._get_object_endpoint(container, name), 690 params=params) 691 return True 692 except exc.OpenStackCloudHTTPError: 693 return False 694 695 def delete_autocreated_image_objects(self, container=None, 696 segment_prefix=None): 697 """Delete all objects autocreated for image uploads. 698 699 This method should generally not be needed, as shade should clean up 700 the objects it uses for object-based image creation. If something 701 goes wrong and it is found that there are leaked objects, this method 702 can be used to delete any objects that shade has created on the user's 703 behalf in service of image uploads. 704 705 :param str container: Name of the container. Defaults to 'images'. 706 :param str segment_prefix: Prefix for the image segment names to 707 delete. If not given, all image upload segments present are 708 deleted. 709 """ 710 if container is None: 711 container = self._OBJECT_AUTOCREATE_CONTAINER 712 # This method only makes sense on clouds that use tasks 713 if not self.image_api_use_tasks: 714 return False 715 716 deleted = False 717 for obj in self.list_objects(container, prefix=segment_prefix): 718 meta = self.get_object_metadata(container, obj['name']) 719 if meta.get( 720 self._OBJECT_AUTOCREATE_KEY, meta.get( 721 self._SHADE_OBJECT_AUTOCREATE_KEY)) == 'true': 722 if self.delete_object(container, obj['name'], meta): 723 deleted = True 724 return deleted 725 726 def get_object_metadata(self, container, name): 727 try: 728 return self._object_store_client.head( 729 self._get_object_endpoint(container, name)).headers 730 except exc.OpenStackCloudException as e: 731 if e.response.status_code == 404: 732 return None 733 raise 734 735 def get_object_raw(self, container, obj, query_string=None, stream=False): 736 """Get a raw response object for an object. 737 738 :param string container: name of the container. 739 :param string obj: name of the object. 740 :param string query_string: 741 query args for uri. (delimiter, prefix, etc.) 742 :param bool stream: 743 Whether to stream the response or not. 744 745 :returns: A `requests.Response` 746 :raises: OpenStackCloudException on operation error. 747 """ 748 endpoint = self._get_object_endpoint(container, obj, query_string) 749 return self._object_store_client.get(endpoint, stream=stream) 750 751 def _get_object_endpoint(self, container, obj=None, query_string=None): 752 endpoint = urllib.parse.quote(container) 753 if obj: 754 endpoint = '{endpoint}/{object}'.format( 755 endpoint=endpoint, 756 object=urllib.parse.quote(obj) 757 ) 758 if query_string: 759 endpoint = '{endpoint}?{query_string}'.format( 760 endpoint=endpoint, query_string=query_string) 761 return endpoint 762 763 def stream_object( 764 self, container, obj, query_string=None, resp_chunk_size=1024): 765 """Download the content via a streaming iterator. 766 767 :param string container: name of the container. 768 :param string obj: name of the object. 769 :param string query_string: 770 query args for uri. (delimiter, prefix, etc.) 771 :param int resp_chunk_size: 772 chunk size of data to read. Only used if the results are 773 774 :returns: 775 An iterator over the content or None if the object is not found. 776 :raises: OpenStackCloudException on operation error. 777 """ 778 try: 779 with self.get_object_raw( 780 container, obj, query_string=query_string) as response: 781 for ret in response.iter_content(chunk_size=resp_chunk_size): 782 yield ret 783 except exc.OpenStackCloudHTTPError as e: 784 if e.response.status_code == 404: 785 return 786 raise 787 788 def get_object(self, container, obj, query_string=None, 789 resp_chunk_size=1024, outfile=None, stream=False): 790 """Get the headers and body of an object 791 792 :param string container: name of the container. 793 :param string obj: name of the object. 794 :param string query_string: 795 query args for uri. (delimiter, prefix, etc.) 796 :param int resp_chunk_size: 797 chunk size of data to read. Only used if the results are 798 being written to a file or stream is True. 799 (optional, defaults to 1k) 800 :param outfile: 801 Write the object to a file instead of returning the contents. 802 If this option is given, body in the return tuple will be None. 803 outfile can either be a file path given as a string, or a 804 File like object. 805 806 :returns: Tuple (headers, body) of the object, or None if the object 807 is not found (404). 808 :raises: OpenStackCloudException on operation error. 809 """ 810 # TODO(mordred) implement resp_chunk_size 811 endpoint = self._get_object_endpoint(container, obj, query_string) 812 try: 813 get_stream = (outfile is not None) 814 with self._object_store_client.get( 815 endpoint, stream=get_stream) as response: 816 response_headers = { 817 k.lower(): v for k, v in response.headers.items()} 818 if outfile: 819 if isinstance(outfile, str): 820 outfile_handle = open(outfile, 'wb') 821 else: 822 outfile_handle = outfile 823 for chunk in response.iter_content( 824 resp_chunk_size, decode_unicode=False): 825 outfile_handle.write(chunk) 826 if isinstance(outfile, str): 827 outfile_handle.close() 828 else: 829 outfile_handle.flush() 830 return (response_headers, None) 831 else: 832 return (response_headers, response.text) 833 except exc.OpenStackCloudHTTPError as e: 834 if e.response.status_code == 404: 835 return None 836 raise 837 838 def _wait_for_futures(self, futures, raise_on_error=True): 839 '''Collect results or failures from a list of running future tasks.''' 840 841 results = [] 842 retries = [] 843 844 # Check on each result as its thread finishes 845 for completed in concurrent.futures.as_completed(futures): 846 try: 847 result = completed.result() 848 exceptions.raise_from_response(result) 849 results.append(result) 850 except (keystoneauth1.exceptions.RetriableConnectionFailure, 851 exceptions.HttpException) as e: 852 error_text = "Exception processing async task: {}".format( 853 str(e)) 854 if raise_on_error: 855 self.log.exception(error_text) 856 raise 857 else: 858 self.log.debug(error_text) 859 # If we get an exception, put the result into a list so we 860 # can try again 861 retries.append(completed.result()) 862 return results, retries 863 864 def _hashes_up_to_date(self, md5, sha256, md5_key, sha256_key): 865 '''Compare md5 and sha256 hashes for being up to date 866 867 md5 and sha256 are the current values. 868 md5_key and sha256_key are the previous values. 869 ''' 870 up_to_date = False 871 if md5 and md5_key == md5: 872 up_to_date = True 873 if sha256 and sha256_key == sha256: 874 up_to_date = True 875 if md5 and md5_key != md5: 876 up_to_date = False 877 if sha256 and sha256_key != sha256: 878 up_to_date = False 879 return up_to_date 880