1# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
5#    http://www.apache.org/licenses/LICENSE-2.0
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
10# See the License for the specific language governing permissions and
11# limitations under the License.
13# import types so that we can reference ListType in sphinx param declarations.
14# We can't just use list, because sphinx gets confused by
15# openstack.resource.Resource.list and openstack.resource2.Resource.list
16import collections
17import concurrent.futures
18import hashlib
19import json
20import os
21import types  # noqa
22import urllib.parse
24import keystoneauth1.exceptions
26from openstack.cloud import _normalize
27from openstack.cloud import _utils
28from openstack.cloud import exc
29from openstack import exceptions
30from openstack import proxy
31from openstack import utils
35# This halves the current default for Swift
36DEFAULT_MAX_FILE_SIZE = (5 * 1024 * 1024 * 1024 + 2) / 2
40    'public': '.r:*,.rlistings',
41    'private': '',
45class ObjectStoreCloudMixin(_normalize.Normalizer):
47    @property
48    def _object_store_client(self):
49        if 'object-store' not in self._raw_clients:
50            raw_client = self._get_raw_client('object-store')
51            self._raw_clients['object-store'] = raw_client
52        return self._raw_clients['object-store']
54    def list_containers(self, full_listing=True, prefix=None):
55        """List containers.
57        :param full_listing: Ignored. Present for backwards compat
59        :returns: list of Munch of the container objects
61        :raises: OpenStackCloudException on operation error.
62        """
63        params = dict(format='json', prefix=prefix)
64        response = self.object_store.get('/', params=params)
65        return self._get_and_munchify(None, proxy._json_response(response))
67    def search_containers(self, name=None, filters=None):
68        """Search containers.
70        :param string name: container name.
71        :param filters: a dict containing additional filters to use.
72            OR
73            A string containing a jmespath expression for further filtering.
74            Example:: "[?last_name==`Smith`] | [?other.gender]==`Female`]"
76        :returns: a list of ``munch.Munch`` containing the containers.
78        :raises: ``OpenStackCloudException``: if something goes wrong during
79            the OpenStack API call.
80        """
81        containers = self.list_containers()
82        return _utils._filter_list(containers, name, filters)
84    def get_container(self, name, skip_cache=False):
85        """Get metadata about a container.
87        :param str name:
88            Name of the container to get metadata for.
89        :param bool skip_cache:
90            Ignore the cache of container metadata for this container.o
91            Defaults to ``False``.
92        """
93        if skip_cache or name not in self._container_cache:
94            try:
95                response = self.object_store.head(
96                    self._get_object_endpoint(name)
97                )
98                exceptions.raise_from_response(response)
99                self._container_cache[name] = response.headers
100            except exc.OpenStackCloudHTTPError as e:
101                if e.response.status_code == 404:
102                    return None
103                raise
104        return self._container_cache[name]
106    def create_container(self, name, public=False):
107        """Create an object-store container.
109        :param str name:
110            Name of the container to create.
111        :param bool public:
112            Whether to set this container to be public. Defaults to ``False``.
113        """
114        container = self.get_container(name)
115        if container:
116            return container
117        exceptions.raise_from_response(self.object_store.put(
118            self._get_object_endpoint(name)
119        ))
120        if public:
121            self.set_container_access(name, 'public')
122        return self.get_container(name, skip_cache=True)
124    def delete_container(self, name):
125        """Delete an object-store container.
127        :param str name: Name of the container to delete.
128        """
129        try:
130            exceptions.raise_from_response(self.object_store.delete(
131                self._get_object_endpoint(name)
132            ))
133            self._container_cache.pop(name, None)
134            return True
135        except exc.OpenStackCloudHTTPError as e:
136            if e.response.status_code == 404:
137                return False
138            if e.response.status_code == 409:
139                raise exc.OpenStackCloudException(
140                    'Attempt to delete container {container} failed. The'
141                    ' container is not empty. Please delete the objects'
142                    ' inside it before deleting the container'.format(
143                        container=name))
144            raise
146    def update_container(self, name, headers):
147        """Update the metadata in a container.
149        :param str name:
150            Name of the container to create.
151        :param dict headers:
152            Key/Value headers to set on the container.
153        """
154        """Update the metadata in a container.
156        :param str name:
157            Name of the container to update.
158        :param dict headers:
159            Key/Value headers to set on the container.
160        """
161        exceptions.raise_from_response(
162            self.object_store.post(
163                self._get_object_endpoint(name), headers=headers)
164        )
166    def set_container_access(self, name, access):
167        """Set the access control list on a container.
169        :param str name:
170            Name of the container.
171        :param str access:
172            ACL string to set on the container. Can also be ``public``
173            or ``private`` which will be translated into appropriate ACL
174            strings.
175        """
176        if access not in OBJECT_CONTAINER_ACLS:
177            raise exc.OpenStackCloudException(
178                "Invalid container access specified: %s.  Must be one of %s"
179                % (access, list(OBJECT_CONTAINER_ACLS.keys())))
180        header = {'x-container-read': OBJECT_CONTAINER_ACLS[access]}
181        self.update_container(name, header)
183    def get_container_access(self, name):
184        """Get the control list from a container.
186        :param str name: Name of the container.
187        """
188        container = self.get_container(name, skip_cache=True)
189        if not container:
190            raise exc.OpenStackCloudException("Container not found: %s" % name)
191        acl = container.get('x-container-read', '')
192        for key, value in OBJECT_CONTAINER_ACLS.items():
193            # Convert to string for the comparison because swiftclient
194            # returns byte values as bytes sometimes and apparently ==
195            # on bytes doesn't work like you'd think
196            if str(acl) == str(value):
197                return key
198        raise exc.OpenStackCloudException(
199            "Could not determine container access for ACL: %s." % acl)
201    def _get_file_hashes(self, filename):
202        file_key = "{filename}:{mtime}".format(
203            filename=filename,
204            mtime=os.stat(filename).st_mtime)
205        if file_key not in self._file_hash_cache:
206            self.log.debug(
207                'Calculating hashes for %(filename)s', {'filename': filename})
208            (md5, sha256) = (None, None)
209            with open(filename, 'rb') as file_obj:
210                (md5, sha256) = self._calculate_data_hashes(file_obj)
211            self._file_hash_cache[file_key] = dict(
212                md5=md5, sha256=sha256)
213            self.log.debug(
214                "Image file %(filename)s md5:%(md5)s sha256:%(sha256)s",
215                {'filename': filename,
216                 'md5': self._file_hash_cache[file_key]['md5'],
217                 'sha256': self._file_hash_cache[file_key]['sha256']})
218        return (self._file_hash_cache[file_key]['md5'],
219                self._file_hash_cache[file_key]['sha256'])
221    def _calculate_data_hashes(self, data):
222        md5 = utils.md5(usedforsecurity=False)
223        sha256 = hashlib.sha256()
225        if hasattr(data, 'read'):
226            for chunk in iter(lambda: data.read(8192), b''):
227                md5.update(chunk)
228                sha256.update(chunk)
229        else:
230            md5.update(data)
231            sha256.update(data)
232        return (md5.hexdigest(), sha256.hexdigest())
234    @_utils.cache_on_arguments()
235    def get_object_capabilities(self):
236        """Get infomation about the object-storage service
238        The object-storage service publishes a set of capabilities that
239        include metadata about maximum values and thresholds.
240        """
241        # The endpoint in the catalog has version and project-id in it
242        # To get capabilities, we have to disassemble and reassemble the URL
243        # This logic is taken from swiftclient
244        endpoint = urllib.parse.urlparse(self.object_store.get_endpoint())
245        url = "{scheme}://{netloc}/info".format(
246            scheme=endpoint.scheme, netloc=endpoint.netloc)
248        return proxy._json_response(self.object_store.get(url))
250    def get_object_segment_size(self, segment_size):
251        """Get a segment size that will work given capabilities"""
252        if segment_size is None:
253            segment_size = DEFAULT_OBJECT_SEGMENT_SIZE
254        min_segment_size = 0
255        try:
256            caps = self.get_object_capabilities()
257        except exc.OpenStackCloudHTTPError as e:
258            if e.response.status_code in (404, 412):
259                server_max_file_size = DEFAULT_MAX_FILE_SIZE
260                self.log.info(
261                    "Swift capabilities not supported. "
262                    "Using default max file size.")
263            else:
264                raise
265        else:
266            server_max_file_size = caps.get('swift', {}).get('max_file_size',
267                                                             0)
268            min_segment_size = caps.get('slo', {}).get('min_segment_size', 0)
270        if segment_size > server_max_file_size:
271            return server_max_file_size
272        if segment_size < min_segment_size:
273            return min_segment_size
274        return segment_size
276    def is_object_stale(
277            self, container, name, filename, file_md5=None, file_sha256=None):
278        """Check to see if an object matches the hashes of a file.
280        :param container: Name of the container.
281        :param name: Name of the object.
282        :param filename: Path to the file.
283        :param file_md5:
284            Pre-calculated md5 of the file contents. Defaults to None which
285            means calculate locally.
286        :param file_sha256:
287            Pre-calculated sha256 of the file contents. Defaults to None which
288            means calculate locally.
289        """
290        metadata = self.get_object_metadata(container, name)
291        if not metadata:
292            self.log.debug(
293                "swift stale check, no object: {container}/{name}".format(
294                    container=container, name=name))
295            return True
297        if not (file_md5 or file_sha256):
298            (file_md5, file_sha256) = self._get_file_hashes(filename)
299        md5_key = metadata.get(
300            self._OBJECT_MD5_KEY, metadata.get(self._SHADE_OBJECT_MD5_KEY, ''))
301        sha256_key = metadata.get(
302            self._OBJECT_SHA256_KEY, metadata.get(
303                self._SHADE_OBJECT_SHA256_KEY, ''))
304        up_to_date = self._hashes_up_to_date(
305            md5=file_md5, sha256=file_sha256,
306            md5_key=md5_key, sha256_key=sha256_key)
308        if not up_to_date:
309            self.log.debug(
310                "swift checksum mismatch: "
311                " %(filename)s!=%(container)s/%(name)s",
312                {'filename': filename, 'container': container, 'name': name})
313            return True
315        self.log.debug(
316            "swift object up to date: %(container)s/%(name)s",
317            {'container': container, 'name': name})
318        return False
320    def create_directory_marker_object(self, container, name, **headers):
321        """Create a zero-byte directory marker object
323        .. note::
325          This method is not needed in most cases. Modern swift does not
326          require directory marker objects. However, some swift installs may
327          need these.
329        When using swift Static Web and Web Listings to serve static content
330        one may need to create a zero-byte object to represent each
331        "directory". Doing so allows Web Listings to generate an index of the
332        objects inside of it, and allows Static Web to render index.html
333        "files" that are "inside" the directory.
335        :param container: The name of the container.
336        :param name: Name for the directory marker object within the container.
337        :param headers: These will be passed through to the object creation
338            API as HTTP Headers.
339        """
340        headers['content-type'] = 'application/directory'
342        return self.create_object(
343            container,
344            name,
345            data='',
346            generate_checksums=False,
347            **headers)
349    def create_object(
350            self, container, name, filename=None,
351            md5=None, sha256=None, segment_size=None,
352            use_slo=True, metadata=None,
353            generate_checksums=None, data=None,
354            **headers):
355        """Create a file object.
357        Automatically uses large-object segments if needed.
359        :param container: The name of the container to store the file in.
360            This container will be created if it does not exist already.
361        :param name: Name for the object within the container.
362        :param filename: The path to the local file whose contents will be
363            uploaded. Mutually exclusive with data.
364        :param data: The content to upload to the object. Mutually exclusive
365           with filename.
366        :param md5: A hexadecimal md5 of the file. (Optional), if it is known
367            and can be passed here, it will save repeating the expensive md5
368            process. It is assumed to be accurate.
369        :param sha256: A hexadecimal sha256 of the file. (Optional) See md5.
370        :param segment_size: Break the uploaded object into segments of this
371            many bytes. (Optional) Shade will attempt to discover the maximum
372            value for this from the server if it is not specified, or will use
373            a reasonable default.
374        :param headers: These will be passed through to the object creation
375            API as HTTP Headers.
376        :param use_slo: If the object is large enough to need to be a Large
377            Object, use a static rather than dynamic object. Static Objects
378            will delete segment objects when the manifest object is deleted.
379            (optional, defaults to True)
380        :param generate_checksums: Whether to generate checksums on the client
381            side that get added to headers for later prevention of double
382            uploads of identical data. (optional, defaults to True)
383        :param metadata: This dict will get changed into headers that set
384            metadata of the object
386        :raises: ``OpenStackCloudException`` on operation error.
387        """
388        if data is not None and filename:
389            raise ValueError(
390                "Both filename and data given. Please choose one.")
391        if data is not None and not name:
392            raise ValueError(
393                "name is a required parameter when data is given")
394        if data is not None and generate_checksums:
395            raise ValueError(
396                "checksums cannot be generated with data parameter")
397        if generate_checksums is None:
398            if data is not None:
399                generate_checksums = False
400            else:
401                generate_checksums = True
403        if not metadata:
404            metadata = {}
406        if not filename and data is None:
407            filename = name
409        if generate_checksums and (md5 is None or sha256 is None):
410            (md5, sha256) = self._get_file_hashes(filename)
411        if md5:
412            headers[self._OBJECT_MD5_KEY] = md5 or ''
413        if sha256:
414            headers[self._OBJECT_SHA256_KEY] = sha256 or ''
415        for (k, v) in metadata.items():
416            if not k.lower().startswith('x-object-meta-'):
417                headers['x-object-meta-' + k] = v
418            else:
419                headers[k] = v
421        endpoint = self._get_object_endpoint(container, name)
423        if data is not None:
424            self.log.debug(
425                "swift uploading data to %(endpoint)s",
426                {'endpoint': endpoint})
428            return self._upload_object_data(endpoint, data, headers)
430        # segment_size gets used as a step value in a range call, so needs
431        # to be an int
432        if segment_size:
433            segment_size = int(segment_size)
434        segment_size = self.get_object_segment_size(segment_size)
435        file_size = os.path.getsize(filename)
437        if self.is_object_stale(container, name, filename, md5, sha256):
439            self.log.debug(
440                "swift uploading %(filename)s to %(endpoint)s",
441                {'filename': filename, 'endpoint': endpoint})
443            if file_size <= segment_size:
444                self._upload_object(endpoint, filename, headers)
445            else:
446                self._upload_large_object(
447                    endpoint, filename, headers,
448                    file_size, segment_size, use_slo)
450    def _upload_object_data(self, endpoint, data, headers):
451        return proxy._json_response(self.object_store.put(
452            endpoint, headers=headers, data=data))
454    def _upload_object(self, endpoint, filename, headers):
455        return proxy._json_response(self.object_store.put(
456            endpoint, headers=headers, data=open(filename, 'rb')))
458    def _get_file_segments(self, endpoint, filename, file_size, segment_size):
459        # Use an ordered dict here so that testing can replicate things
460        segments = collections.OrderedDict()
461        for (index, offset) in enumerate(range(0, file_size, segment_size)):
462            remaining = file_size - (index * segment_size)
463            segment = _utils.FileSegment(
464                filename, offset,
465                segment_size if segment_size < remaining else remaining)
466            name = '{endpoint}/{index:0>6}'.format(
467                endpoint=endpoint, index=index)
468            segments[name] = segment
469        return segments
471    def _object_name_from_url(self, url):
472        '''Get container_name/object_name from the full URL called.
474        Remove the Swift endpoint from the front of the URL, and remove
475        the leaving / that will leave behind.'''
476        endpoint = self.object_store.get_endpoint()
477        object_name = url.replace(endpoint, '')
478        if object_name.startswith('/'):
479            object_name = object_name[1:]
480        return object_name
482    def _add_etag_to_manifest(self, segment_results, manifest):
483        for result in segment_results:
484            if 'Etag' not in result.headers:
485                continue
486            name = self._object_name_from_url(result.url)
487            for entry in manifest:
488                if entry['path'] == '/{name}'.format(name=name):
489                    entry['etag'] = result.headers['Etag']
491    def _upload_large_object(
492            self, endpoint, filename,
493            headers, file_size, segment_size, use_slo):
494        # If the object is big, we need to break it up into segments that
495        # are no larger than segment_size, upload each of them individually
496        # and then upload a manifest object. The segments can be uploaded in
497        # parallel, so we'll use the async feature of the TaskManager.
499        segment_futures = []
500        segment_results = []
501        retry_results = []
502        retry_futures = []
503        manifest = []
505        # Get an OrderedDict with keys being the swift location for the
506        # segment, the value a FileSegment file-like object that is a
507        # slice of the data for the segment.
508        segments = self._get_file_segments(
509            endpoint, filename, file_size, segment_size)
511        # Schedule the segments for upload
512        for name, segment in segments.items():
513            # Async call to put - schedules execution and returns a future
514            segment_future = self._pool_executor.submit(
515                self.object_store.put,
516                name, headers=headers, data=segment,
517                raise_exc=False)
518            segment_futures.append(segment_future)
519            # TODO(mordred) Collect etags from results to add to this manifest
520            # dict. Then sort the list of dicts by path.
521            manifest.append(dict(
522                path='/{name}'.format(name=name),
523                size_bytes=segment.length))
525        # Try once and collect failed results to retry
526        segment_results, retry_results = self._wait_for_futures(
527            segment_futures, raise_on_error=False)
529        self._add_etag_to_manifest(segment_results, manifest)
531        for result in retry_results:
532            # Grab the FileSegment for the failed upload so we can retry
533            name = self._object_name_from_url(result.url)
534            segment = segments[name]
535            segment.seek(0)
536            # Async call to put - schedules execution and returns a future
537            segment_future = self._pool_executor.submit(
538                self.object_store.put,
539                name, headers=headers, data=segment)
540            # TODO(mordred) Collect etags from results to add to this manifest
541            # dict. Then sort the list of dicts by path.
542            retry_futures.append(segment_future)
544        # If any segments fail the second time, just throw the error
545        segment_results, retry_results = self._wait_for_futures(
546            retry_futures, raise_on_error=True)
548        self._add_etag_to_manifest(segment_results, manifest)
550        # If the final manifest upload fails, remove the segments we've
551        # already uploaded.
552        try:
553            if use_slo:
554                return self._finish_large_object_slo(endpoint, headers,
555                                                     manifest)
556            else:
557                return self._finish_large_object_dlo(endpoint, headers)
558        except Exception:
559            try:
560                segment_prefix = endpoint.split('/')[-1]
561                self.log.debug(
562                    "Failed to upload large object manifest for %s. "
563                    "Removing segment uploads.", segment_prefix)
564                self.delete_autocreated_image_objects(
565                    segment_prefix=segment_prefix)
566            except Exception:
567                self.log.exception(
568                    "Failed to cleanup image objects for %s:",
569                    segment_prefix)
570            raise
572    def _finish_large_object_slo(self, endpoint, headers, manifest):
573        # TODO(mordred) send an etag of the manifest, which is the md5sum
574        # of the concatenation of the etags of the results
575        headers = headers.copy()
576        retries = 3
577        while True:
578            try:
579                return self._object_store_client.put(
580                    endpoint,
581                    params={'multipart-manifest': 'put'},
582                    headers=headers, data=json.dumps(manifest))
583            except Exception:
584                retries -= 1
585                if retries == 0:
586                    raise
588    def _finish_large_object_dlo(self, endpoint, headers):
589        headers = headers.copy()
590        headers['X-Object-Manifest'] = endpoint
591        retries = 3
592        while True:
593            try:
594                return self._object_store_client.put(endpoint, headers=headers)
595            except Exception:
596                retries -= 1
597                if retries == 0:
598                    raise
600    def update_object(self, container, name, metadata=None, **headers):
601        """Update the metadata of an object
603        :param container: The name of the container the object is in
604        :param name: Name for the object within the container.
605        :param metadata: This dict will get changed into headers that set
606            metadata of the object
607        :param headers: These will be passed through to the object update
608            API as HTTP Headers.
610        :raises: ``OpenStackCloudException`` on operation error.
611        """
612        if not metadata:
613            metadata = {}
615        metadata_headers = {}
617        for (k, v) in metadata.items():
618            metadata_headers['x-object-meta-' + k] = v
620        headers = dict(headers, **metadata_headers)
622        return self._object_store_client.post(
623            self._get_object_endpoint(container, name),
624            headers=headers)
626    def list_objects(self, container, full_listing=True, prefix=None):
627        """List objects.
629        :param container: Name of the container to list objects in.
630        :param full_listing: Ignored. Present for backwards compat
631        :param string prefix:
632            only objects with this prefix will be returned.
633            (optional)
635        :returns: list of Munch of the objects
637        :raises: OpenStackCloudException on operation error.
638        """
639        params = dict(format='json', prefix=prefix)
640        data = self._object_store_client.get(container, params=params)
641        return self._get_and_munchify(None, data)
643    def search_objects(self, container, name=None, filters=None):
644        """Search objects.
646        :param string name: object name.
647        :param filters: a dict containing additional filters to use.
648            OR
649            A string containing a jmespath expression for further filtering.
650            Example:: "[?last_name==`Smith`] | [?other.gender]==`Female`]"
652        :returns: a list of ``munch.Munch`` containing the objects.
654        :raises: ``OpenStackCloudException``: if something goes wrong during
655            the OpenStack API call.
656        """
657        objects = self.list_objects(container)
658        return _utils._filter_list(objects, name, filters)
660    def delete_object(self, container, name, meta=None):
661        """Delete an object from a container.
663        :param string container: Name of the container holding the object.
664        :param string name: Name of the object to delete.
665        :param dict meta: Metadata for the object in question. (optional, will
666                          be fetched if not provided)
668        :returns: True if delete succeeded, False if the object was not found.
670        :raises: OpenStackCloudException on operation error.
671        """
672        # TODO(mordred) DELETE for swift returns status in text/plain format
673        # like so:
674        #   Number Deleted: 15
675        #   Number Not Found: 0
676        #   Response Body:
677        #   Response Status: 200 OK
678        #   Errors:
679        # We should ultimately do something with that
680        try:
681            if not meta:
682                meta = self.get_object_metadata(container, name)
683            if not meta:
684                return False
685            params = {}
686            if meta.get('X-Static-Large-Object', None) == 'True':
687                params['multipart-manifest'] = 'delete'
688            self._object_store_client.delete(
689                self._get_object_endpoint(container, name),
690                params=params)
691            return True
692        except exc.OpenStackCloudHTTPError:
693            return False
695    def delete_autocreated_image_objects(self, container=None,
696                                         segment_prefix=None):
697        """Delete all objects autocreated for image uploads.
699        This method should generally not be needed, as shade should clean up
700        the objects it uses for object-based image creation. If something
701        goes wrong and it is found that there are leaked objects, this method
702        can be used to delete any objects that shade has created on the user's
703        behalf in service of image uploads.
705        :param str container: Name of the container. Defaults to 'images'.
706        :param str segment_prefix: Prefix for the image segment names to
707            delete. If not given, all image upload segments present are
708            deleted.
709        """
710        if container is None:
711            container = self._OBJECT_AUTOCREATE_CONTAINER
712        # This method only makes sense on clouds that use tasks
713        if not self.image_api_use_tasks:
714            return False
716        deleted = False
717        for obj in self.list_objects(container, prefix=segment_prefix):
718            meta = self.get_object_metadata(container, obj['name'])
719            if meta.get(
720                    self._OBJECT_AUTOCREATE_KEY, meta.get(
721                        self._SHADE_OBJECT_AUTOCREATE_KEY)) == 'true':
722                if self.delete_object(container, obj['name'], meta):
723                    deleted = True
724        return deleted
726    def get_object_metadata(self, container, name):
727        try:
728            return self._object_store_client.head(
729                self._get_object_endpoint(container, name)).headers
730        except exc.OpenStackCloudException as e:
731            if e.response.status_code == 404:
732                return None
733            raise
735    def get_object_raw(self, container, obj, query_string=None, stream=False):
736        """Get a raw response object for an object.
738        :param string container: name of the container.
739        :param string obj: name of the object.
740        :param string query_string:
741            query args for uri. (delimiter, prefix, etc.)
742        :param bool stream:
743            Whether to stream the response or not.
745        :returns: A `requests.Response`
746        :raises: OpenStackCloudException on operation error.
747        """
748        endpoint = self._get_object_endpoint(container, obj, query_string)
749        return self._object_store_client.get(endpoint, stream=stream)
751    def _get_object_endpoint(self, container, obj=None, query_string=None):
752        endpoint = urllib.parse.quote(container)
753        if obj:
754            endpoint = '{endpoint}/{object}'.format(
755                endpoint=endpoint,
756                object=urllib.parse.quote(obj)
757            )
758        if query_string:
759            endpoint = '{endpoint}?{query_string}'.format(
760                endpoint=endpoint, query_string=query_string)
761        return endpoint
763    def stream_object(
764            self, container, obj, query_string=None, resp_chunk_size=1024):
765        """Download the content via a streaming iterator.
767        :param string container: name of the container.
768        :param string obj: name of the object.
769        :param string query_string:
770            query args for uri. (delimiter, prefix, etc.)
771        :param int resp_chunk_size:
772            chunk size of data to read. Only used if the results are
774        :returns:
775            An iterator over the content or None if the object is not found.
776        :raises: OpenStackCloudException on operation error.
777        """
778        try:
779            with self.get_object_raw(
780                    container, obj, query_string=query_string) as response:
781                for ret in response.iter_content(chunk_size=resp_chunk_size):
782                    yield ret
783        except exc.OpenStackCloudHTTPError as e:
784            if e.response.status_code == 404:
785                return
786            raise
788    def get_object(self, container, obj, query_string=None,
789                   resp_chunk_size=1024, outfile=None, stream=False):
790        """Get the headers and body of an object
792        :param string container: name of the container.
793        :param string obj: name of the object.
794        :param string query_string:
795            query args for uri. (delimiter, prefix, etc.)
796        :param int resp_chunk_size:
797            chunk size of data to read. Only used if the results are
798            being written to a file or stream is True.
799            (optional, defaults to 1k)
800        :param outfile:
801            Write the object to a file instead of returning the contents.
802            If this option is given, body in the return tuple will be None.
803            outfile can either be a file path given as a string, or a
804            File like object.
806        :returns: Tuple (headers, body) of the object, or None if the object
807                  is not found (404).
808        :raises: OpenStackCloudException on operation error.
809        """
810        # TODO(mordred) implement resp_chunk_size
811        endpoint = self._get_object_endpoint(container, obj, query_string)
812        try:
813            get_stream = (outfile is not None)
814            with self._object_store_client.get(
815                    endpoint, stream=get_stream) as response:
816                response_headers = {
817                    k.lower(): v for k, v in response.headers.items()}
818                if outfile:
819                    if isinstance(outfile, str):
820                        outfile_handle = open(outfile, 'wb')
821                    else:
822                        outfile_handle = outfile
823                    for chunk in response.iter_content(
824                            resp_chunk_size, decode_unicode=False):
825                        outfile_handle.write(chunk)
826                    if isinstance(outfile, str):
827                        outfile_handle.close()
828                    else:
829                        outfile_handle.flush()
830                    return (response_headers, None)
831                else:
832                    return (response_headers, response.text)
833        except exc.OpenStackCloudHTTPError as e:
834            if e.response.status_code == 404:
835                return None
836            raise
838    def _wait_for_futures(self, futures, raise_on_error=True):
839        '''Collect results or failures from a list of running future tasks.'''
841        results = []
842        retries = []
844        # Check on each result as its thread finishes
845        for completed in concurrent.futures.as_completed(futures):
846            try:
847                result = completed.result()
848                exceptions.raise_from_response(result)
849                results.append(result)
850            except (keystoneauth1.exceptions.RetriableConnectionFailure,
851                    exceptions.HttpException) as e:
852                error_text = "Exception processing async task: {}".format(
853                    str(e))
854                if raise_on_error:
855                    self.log.exception(error_text)
856                    raise
857                else:
858                    self.log.debug(error_text)
859                # If we get an exception, put the result into a list so we
860                # can try again
861                retries.append(completed.result())
862        return results, retries
864    def _hashes_up_to_date(self, md5, sha256, md5_key, sha256_key):
865        '''Compare md5 and sha256 hashes for being up to date
867        md5 and sha256 are the current values.
868        md5_key and sha256_key are the previous values.
869        '''
870        up_to_date = False
871        if md5 and md5_key == md5:
872            up_to_date = True
873        if sha256 and sha256_key == sha256:
874            up_to_date = True
875        if md5 and md5_key != md5:
876            up_to_date = False
877        if sha256 and sha256_key != sha256:
878            up_to_date = False
879        return up_to_date