1# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
4#
5#    http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12import copy
13import functools
14import queue
15# import types so that we can reference ListType in sphinx param declarations.
16# We can't just use list, because sphinx gets confused by
17# openstack.resource.Resource.list and openstack.resource2.Resource.list
18import types  # noqa
19import warnings
20
21import dogpile.cache
22import keystoneauth1.exceptions
23import keystoneauth1.session
24import munch
25import requests.models
26import requestsexceptions
27
28from openstack import _log
29from openstack.cloud import _floating_ip
30from openstack.cloud import _object_store
31from openstack.cloud import _utils
32from openstack.cloud import exc
33from openstack.cloud import meta
34import openstack.config
35from openstack.config import cloud_region as cloud_region_mod
36from openstack import proxy
37from openstack import utils
38
39
40DEFAULT_SERVER_AGE = 5
41DEFAULT_PORT_AGE = 5
42DEFAULT_FLOAT_AGE = 5
43_CONFIG_DOC_URL = _floating_ip._CONFIG_DOC_URL
44DEFAULT_OBJECT_SEGMENT_SIZE = _object_store.DEFAULT_OBJECT_SEGMENT_SIZE
45# This halves the current default for Swift
46DEFAULT_MAX_FILE_SIZE = _object_store.DEFAULT_MAX_FILE_SIZE
47OBJECT_CONTAINER_ACLS = _object_store.OBJECT_CONTAINER_ACLS
48
49
50class _OpenStackCloudMixin:
51    """Represent a connection to an OpenStack Cloud.
52
53    OpenStackCloud is the entry point for all cloud operations, regardless
54    of which OpenStack service those operations may ultimately come from.
55    The operations on an OpenStackCloud are resource oriented rather than
56    REST API operation oriented. For instance, one will request a Floating IP
57    and that Floating IP will be actualized either via neutron or via nova
58    depending on how this particular cloud has decided to arrange itself.
59
60    :param bool strict: Only return documented attributes for each resource
61                        as per the Data Model contract. (Default False)
62    """
63    _OBJECT_MD5_KEY = 'x-object-meta-x-sdk-md5'
64    _OBJECT_SHA256_KEY = 'x-object-meta-x-sdk-sha256'
65    _OBJECT_AUTOCREATE_KEY = 'x-object-meta-x-sdk-autocreated'
66    _OBJECT_AUTOCREATE_CONTAINER = 'images'
67
68    # NOTE(shade) shade keys were x-object-meta-x-shade-md5 - we need to check
69    #             those in freshness checks so that a shade->sdk transition
70    #             doesn't result in a re-upload
71    _SHADE_OBJECT_MD5_KEY = 'x-object-meta-x-shade-md5'
72    _SHADE_OBJECT_SHA256_KEY = 'x-object-meta-x-shade-sha256'
73    _SHADE_OBJECT_AUTOCREATE_KEY = 'x-object-meta-x-shade-autocreated'
74
75    def __init__(self):
76
77        super(_OpenStackCloudMixin, self).__init__()
78
79        self.log = _log.setup_logging('openstack')
80
81        self.name = self.config.name
82        self.auth = self.config.get_auth_args()
83        self.default_interface = self.config.get_interface()
84        self.force_ipv4 = self.config.force_ipv4
85
86        (self.verify, self.cert) = self.config.get_requests_verify_args()
87
88        # Turn off urllib3 warnings about insecure certs if we have
89        # explicitly configured requests to tell it we do not want
90        # cert verification
91        if not self.verify:
92            self.log.debug(
93                "Turning off Insecure SSL warnings since verify=False")
94            category = requestsexceptions.InsecureRequestWarning
95            if category:
96                # InsecureRequestWarning references a Warning class or is None
97                warnings.filterwarnings('ignore', category=category)
98
99        self._disable_warnings = {}
100
101        cache_expiration_time = int(self.config.get_cache_expiration_time())
102        cache_class = self.config.get_cache_class()
103        cache_arguments = self.config.get_cache_arguments()
104
105        self._resource_caches = {}
106
107        if cache_class != 'dogpile.cache.null':
108            self.cache_enabled = True
109            self._cache = self._make_cache(
110                cache_class, cache_expiration_time, cache_arguments)
111            expirations = self.config.get_cache_expirations()
112            for expire_key in expirations.keys():
113                # Only build caches for things we have list operations for
114                if getattr(
115                        self, 'list_{0}'.format(expire_key), None):
116                    self._resource_caches[expire_key] = self._make_cache(
117                        cache_class, expirations[expire_key], cache_arguments)
118
119            self._SERVER_AGE = DEFAULT_SERVER_AGE
120            self._PORT_AGE = DEFAULT_PORT_AGE
121            self._FLOAT_AGE = DEFAULT_FLOAT_AGE
122        else:
123            self.cache_enabled = False
124
125            def _fake_invalidate(unused):
126                pass
127
128            class _FakeCache:
129                def invalidate(self):
130                    pass
131
132            # Don't cache list_servers if we're not caching things.
133            # Replace this with a more specific cache configuration
134            # soon.
135            self._SERVER_AGE = 0
136            self._PORT_AGE = 0
137            self._FLOAT_AGE = 0
138            self._cache = _FakeCache()
139            # Undecorate cache decorated methods. Otherwise the call stacks
140            # wind up being stupidly long and hard to debug
141            for method in _utils._decorated_methods:
142                meth_obj = getattr(self, method, None)
143                if not meth_obj:
144                    continue
145                if (hasattr(meth_obj, 'invalidate')
146                        and hasattr(meth_obj, 'func')):
147                    new_func = functools.partial(meth_obj.func, self)
148                    new_func.invalidate = _fake_invalidate
149                    setattr(self, method, new_func)
150
151        # If server expiration time is set explicitly, use that. Otherwise
152        # fall back to whatever it was before
153        self._SERVER_AGE = self.config.get_cache_resource_expiration(
154            'server', self._SERVER_AGE)
155        self._PORT_AGE = self.config.get_cache_resource_expiration(
156            'port', self._PORT_AGE)
157        self._FLOAT_AGE = self.config.get_cache_resource_expiration(
158            'floating_ip', self._FLOAT_AGE)
159
160        self._container_cache = dict()
161        self._file_hash_cache = dict()
162
163        # self.__pool_executor = None
164
165        self._raw_clients = {}
166
167        self._local_ipv6 = (
168            _utils.localhost_supports_ipv6() if not self.force_ipv4 else False)
169
170    def connect_as(self, **kwargs):
171        """Make a new OpenStackCloud object with new auth context.
172
173        Take the existing settings from the current cloud and construct a new
174        OpenStackCloud object with some of the auth settings overridden. This
175        is useful for getting an object to perform tasks with as another user,
176        or in the context of a different project.
177
178        .. code-block:: python
179
180          conn = openstack.connect(cloud='example')
181          # Work normally
182          servers = conn.list_servers()
183          conn2 = conn.connect_as(username='different-user', password='')
184          # Work as different-user
185          servers = conn2.list_servers()
186
187        :param kwargs: keyword arguments can contain anything that would
188                       normally go in an auth dict. They will override the same
189                       settings from the parent cloud as appropriate. Entries
190                       that do not want to be overridden can be ommitted.
191        """
192
193        if self.config._openstack_config:
194            config = self.config._openstack_config
195        else:
196            # TODO(mordred) Replace this with from_session
197            config = openstack.config.OpenStackConfig(
198                app_name=self.config._app_name,
199                app_version=self.config._app_version,
200                load_yaml_config=False)
201        params = copy.deepcopy(self.config.config)
202        # Remove profile from current cloud so that overridding works
203        params.pop('profile', None)
204
205        # Utility function to help with the stripping below.
206        def pop_keys(params, auth, name_key, id_key):
207            if name_key in auth or id_key in auth:
208                params['auth'].pop(name_key, None)
209                params['auth'].pop(id_key, None)
210
211        # If there are user, project or domain settings in the incoming auth
212        # dict, strip out both id and name so that a user can say:
213        #     cloud.connect_as(project_name='foo')
214        # and have that work with clouds that have a project_id set in their
215        # config.
216        for prefix in ('user', 'project'):
217            if prefix == 'user':
218                name_key = 'username'
219            else:
220                name_key = 'project_name'
221            id_key = '{prefix}_id'.format(prefix=prefix)
222            pop_keys(params, kwargs, name_key, id_key)
223            id_key = '{prefix}_domain_id'.format(prefix=prefix)
224            name_key = '{prefix}_domain_name'.format(prefix=prefix)
225            pop_keys(params, kwargs, name_key, id_key)
226
227        for key, value in kwargs.items():
228            params['auth'][key] = value
229
230        cloud_region = config.get_one(**params)
231        # Attach the discovery cache from the old session so we won't
232        # double discover.
233        cloud_region._discovery_cache = self.session._discovery_cache
234        # Override the cloud name so that logging/location work right
235        cloud_region._name = self.name
236        cloud_region.config['profile'] = self.name
237        # Use self.__class__ so that we return whatever this if, like if it's
238        # a subclass in the case of shade wrapping sdk.
239        return self.__class__(config=cloud_region)
240
241    def connect_as_project(self, project):
242        """Make a new OpenStackCloud object with a new project.
243
244        Take the existing settings from the current cloud and construct a new
245        OpenStackCloud object with the project settings overridden. This
246        is useful for getting an object to perform tasks with as another user,
247        or in the context of a different project.
248
249        .. code-block:: python
250
251          cloud = openstack.connect(cloud='example')
252          # Work normally
253          servers = cloud.list_servers()
254          cloud2 = cloud.connect_as_project('different-project')
255          # Work in different-project
256          servers = cloud2.list_servers()
257
258        :param project: Either a project name or a project dict as returned by
259                        `list_projects`.
260        """
261        auth = {}
262        if isinstance(project, dict):
263            auth['project_id'] = project.get('id')
264            auth['project_name'] = project.get('name')
265            if project.get('domain_id'):
266                auth['project_domain_id'] = project['domain_id']
267        else:
268            auth['project_name'] = project
269        return self.connect_as(**auth)
270
271    def global_request(self, global_request_id):
272        """Make a new Connection object with a global request id set.
273
274        Take the existing settings from the current Connection and construct a
275        new Connection object with the global_request_id overridden.
276
277        .. code-block:: python
278
279          from oslo_context import context
280          cloud = openstack.connect(cloud='example')
281          # Work normally
282          servers = cloud.list_servers()
283          cloud2 = cloud.global_request(context.generate_request_id())
284          # cloud2 sends all requests with global_request_id set
285          servers = cloud2.list_servers()
286
287        Additionally, this can be used as a context manager:
288
289        .. code-block:: python
290
291          from oslo_context import context
292          c = openstack.connect(cloud='example')
293          # Work normally
294          servers = c.list_servers()
295          with c.global_request(context.generate_request_id()) as c2:
296              # c2 sends all requests with global_request_id set
297              servers = c2.list_servers()
298
299        :param global_request_id: The `global_request_id` to send.
300        """
301        params = copy.deepcopy(self.config.config)
302        cloud_region = cloud_region_mod.from_session(
303            session=self.session,
304            app_name=self.config._app_name,
305            app_version=self.config._app_version,
306            discovery_cache=self.session._discovery_cache,
307            **params)
308
309        # Override the cloud name so that logging/location work right
310        cloud_region._name = self.name
311        cloud_region.config['profile'] = self.name
312        # Use self.__class__ so that we return whatever this is, like if it's
313        # a subclass in the case of shade wrapping sdk.
314        new_conn = self.__class__(config=cloud_region)
315        new_conn.set_global_request_id(global_request_id)
316        return new_conn
317
318    def _make_cache(self, cache_class, expiration_time, arguments):
319        return dogpile.cache.make_region(
320            function_key_generator=self._make_cache_key
321        ).configure(
322            cache_class,
323            expiration_time=expiration_time,
324            arguments=arguments)
325
326    def _make_cache_key(self, namespace, fn):
327        fname = fn.__name__
328        if namespace is None:
329            name_key = self.name
330        else:
331            name_key = '%s:%s' % (self.name, namespace)
332
333        def generate_key(*args, **kwargs):
334            # TODO(frickler): make handling arg keys actually work
335            arg_key = ''
336            kw_keys = sorted(kwargs.keys())
337            kwargs_key = ','.join(
338                ['%s:%s' % (k, kwargs[k]) for k in kw_keys if k != 'cache'])
339            ans = "_".join(
340                [str(name_key), fname, arg_key, kwargs_key])
341            return ans
342        return generate_key
343
344    def _get_cache(self, resource_name):
345        if resource_name and resource_name in self._resource_caches:
346            return self._resource_caches[resource_name]
347        else:
348            return self._cache
349
350    def _get_major_version_id(self, version):
351        if isinstance(version, int):
352            return version
353        elif isinstance(version, (str, tuple)):
354            return int(version[0])
355        return version
356
357    def _get_versioned_client(
358            self, service_type, min_version=None, max_version=None):
359        config_version = self.config.get_api_version(service_type)
360        config_major = self._get_major_version_id(config_version)
361        max_major = self._get_major_version_id(max_version)
362        min_major = self._get_major_version_id(min_version)
363        # TODO(shade) This should be replaced with use of Connection. However,
364        #             we need to find a sane way to deal with this additional
365        #             logic - or we need to give up on it. If we give up on it,
366        #             we need to make sure we can still support it in the shade
367        #             compat layer.
368        # NOTE(mordred) This logic for versions is slightly different
369        # than the ksa Adapter constructor logic. openstack.cloud knows the
370        # versions it knows, and uses them when it detects them. However, if
371        # a user requests a version, and it's not found, and a different one
372        # openstack.cloud does know about is found, that's a warning in
373        # openstack.cloud.
374        if config_version:
375            if min_major and config_major < min_major:
376                raise exc.OpenStackCloudException(
377                    "Version {config_version} requested for {service_type}"
378                    " but shade understands a minimum of {min_version}".format(
379                        config_version=config_version,
380                        service_type=service_type,
381                        min_version=min_version))
382            elif max_major and config_major > max_major:
383                raise exc.OpenStackCloudException(
384                    "Version {config_version} requested for {service_type}"
385                    " but openstack.cloud understands a maximum of"
386                    " {max_version}".format(
387                        config_version=config_version,
388                        service_type=service_type,
389                        max_version=max_version))
390            request_min_version = config_version
391            request_max_version = '{version}.latest'.format(
392                version=config_major)
393            adapter = proxy._ShadeAdapter(
394                session=self.session,
395                service_type=self.config.get_service_type(service_type),
396                service_name=self.config.get_service_name(service_type),
397                interface=self.config.get_interface(service_type),
398                endpoint_override=self.config.get_endpoint(service_type),
399                region_name=self.config.get_region_name(service_type),
400                statsd_prefix=self.config.get_statsd_prefix(),
401                statsd_client=self.config.get_statsd_client(),
402                prometheus_counter=self.config.get_prometheus_counter(),
403                prometheus_histogram=self.config.get_prometheus_histogram(),
404                influxdb_client=self.config.get_influxdb_client(),
405                min_version=request_min_version,
406                max_version=request_max_version)
407            if adapter.get_endpoint():
408                return adapter
409
410        adapter = proxy._ShadeAdapter(
411            session=self.session,
412            service_type=self.config.get_service_type(service_type),
413            service_name=self.config.get_service_name(service_type),
414            interface=self.config.get_interface(service_type),
415            endpoint_override=self.config.get_endpoint(service_type),
416            region_name=self.config.get_region_name(service_type),
417            min_version=min_version,
418            max_version=max_version)
419
420        # data.api_version can be None if no version was detected, such
421        # as with neutron
422        api_version = adapter.get_api_major_version(
423            endpoint_override=self.config.get_endpoint(service_type))
424        api_major = self._get_major_version_id(api_version)
425
426        # If we detect a different version that was configured, warn the user.
427        # shade still knows what to do - but if the user gave us an explicit
428        # version and we couldn't find it, they may want to investigate.
429        if api_version and config_version and (api_major != config_major):
430            warning_msg = (
431                '{service_type} is configured for {config_version}'
432                ' but only {api_version} is available. shade is happy'
433                ' with this version, but if you were trying to force an'
434                ' override, that did not happen. You may want to check'
435                ' your cloud, or remove the version specification from'
436                ' your config.'.format(
437                    service_type=service_type,
438                    config_version=config_version,
439                    api_version='.'.join([str(f) for f in api_version])))
440            self.log.debug(warning_msg)
441            warnings.warn(warning_msg)
442        return adapter
443
444    # TODO(shade) This should be replaced with using openstack Connection
445    #             object.
446    def _get_raw_client(
447            self, service_type, api_version=None, endpoint_override=None):
448        return proxy._ShadeAdapter(
449            session=self.session,
450            service_type=self.config.get_service_type(service_type),
451            service_name=self.config.get_service_name(service_type),
452            interface=self.config.get_interface(service_type),
453            endpoint_override=self.config.get_endpoint(
454                service_type) or endpoint_override,
455            region_name=self.config.get_region_name(service_type))
456
457    def _is_client_version(self, client, version):
458        client_name = '_{client}_client'.format(
459            client=client.replace('-', '_'))
460        client = getattr(self, client_name)
461        return client._version_matches(version)
462
463    @property
464    def _application_catalog_client(self):
465        if 'application-catalog' not in self._raw_clients:
466            self._raw_clients['application-catalog'] = self._get_raw_client(
467                'application-catalog')
468        return self._raw_clients['application-catalog']
469
470    @property
471    def _database_client(self):
472        if 'database' not in self._raw_clients:
473            self._raw_clients['database'] = self._get_raw_client('database')
474        return self._raw_clients['database']
475
476    @property
477    def _raw_image_client(self):
478        if 'raw-image' not in self._raw_clients:
479            image_client = self._get_raw_client('image')
480            self._raw_clients['raw-image'] = image_client
481        return self._raw_clients['raw-image']
482
483    def pprint(self, resource):
484        """Wrapper around pprint that groks munch objects"""
485        # import late since this is a utility function
486        import pprint
487        new_resource = _utils._dictify_resource(resource)
488        pprint.pprint(new_resource)
489
490    def pformat(self, resource):
491        """Wrapper around pformat that groks munch objects"""
492        # import late since this is a utility function
493        import pprint
494        new_resource = _utils._dictify_resource(resource)
495        return pprint.pformat(new_resource)
496
497    @property
498    def _keystone_catalog(self):
499        return self.session.auth.get_access(self.session).service_catalog
500
501    @property
502    def service_catalog(self):
503        return self._keystone_catalog.catalog
504
505    def endpoint_for(self, service_type, interface=None, region_name=None):
506        """Return the endpoint for a given service.
507
508        Respects config values for Connection, including
509        ``*_endpoint_override``. For direct values from the catalog
510        regardless of overrides, see
511        :meth:`~openstack.config.cloud_region.CloudRegion.get_endpoint_from_catalog`
512
513        :param service_type: Service Type of the endpoint to search for.
514        :param interface:
515            Interface of the endpoint to search for. Optional, defaults to
516            the configured value for interface for this Connection.
517        :param region_name:
518            Region Name of the endpoint to search for. Optional, defaults to
519            the configured value for region_name for this Connection.
520
521        :returns: The endpoint of the service, or None if not found.
522        """
523
524        endpoint_override = self.config.get_endpoint(service_type)
525        if endpoint_override:
526            return endpoint_override
527        return self.config.get_endpoint_from_catalog(
528            service_type=service_type,
529            interface=interface,
530            region_name=region_name)
531
532    @property
533    def auth_token(self):
534        # Keystone's session will reuse a token if it is still valid.
535        # We don't need to track validity here, just get_token() each time.
536        return self.session.get_token()
537
538    @property
539    def current_user_id(self):
540        """Get the id of the currently logged-in user from the token."""
541        return self.session.auth.get_access(self.session).user_id
542
543    @property
544    def current_project_id(self):
545        """Get the current project ID.
546
547        Returns the project_id of the current token scope. None means that
548        the token is domain scoped or unscoped.
549
550        :raises keystoneauth1.exceptions.auth.AuthorizationFailure:
551            if a new token fetch fails.
552        :raises keystoneauth1.exceptions.auth_plugins.MissingAuthPlugin:
553            if a plugin is not available.
554        """
555        return self.session.get_project_id()
556
557    @property
558    def current_project(self):
559        """Return a ``munch.Munch`` describing the current project"""
560        return self._get_project_info()
561
562    def _get_project_info(self, project_id=None):
563        project_info = munch.Munch(
564            id=project_id,
565            name=None,
566            domain_id=None,
567            domain_name=None,
568        )
569        if not project_id or project_id == self.current_project_id:
570            # If we don't have a project_id parameter, it means a user is
571            # directly asking what the current state is.
572            # Alternately, if we have one, that means we're calling this
573            # from within a normalize function, which means the object has
574            # a project_id associated with it. If the project_id matches
575            # the project_id of our current token, that means we can supplement
576            # the info with human readable info about names if we have them.
577            # If they don't match, that means we're an admin who has pulled
578            # an object from a different project, so adding info from the
579            # current token would be wrong.
580            auth_args = self.config.config.get('auth', {})
581            project_info['id'] = self.current_project_id
582            project_info['name'] = auth_args.get('project_name')
583            project_info['domain_id'] = auth_args.get('project_domain_id')
584            project_info['domain_name'] = auth_args.get('project_domain_name')
585        return project_info
586
587    @property
588    def current_location(self):
589        """Return a ``munch.Munch`` explaining the current cloud location."""
590        return self._get_current_location()
591
592    def _get_current_location(self, project_id=None, zone=None):
593        return munch.Munch(
594            cloud=self.name,
595            # TODO(efried): This is wrong, but it only seems to be used in a
596            # repr; can we get rid of it?
597            region_name=self.config.get_region_name(),
598            zone=zone,
599            project=self._get_project_info(project_id),
600        )
601
602    def _get_identity_location(self):
603        '''Identity resources do not exist inside of projects.'''
604        return munch.Munch(
605            cloud=self.name,
606            region_name=None,
607            zone=None,
608            project=munch.Munch(
609                id=None,
610                name=None,
611                domain_id=None,
612                domain_name=None))
613
614    def _get_project_id_param_dict(self, name_or_id):
615        if name_or_id:
616            project = self.get_project(name_or_id)
617            if not project:
618                return {}
619            if self._is_client_version('identity', 3):
620                return {'default_project_id': project['id']}
621            else:
622                return {'tenant_id': project['id']}
623        else:
624            return {}
625
626    def _get_domain_id_param_dict(self, domain_id):
627        """Get a useable domain."""
628
629        # Keystone v3 requires domains for user and project creation. v2 does
630        # not. However, keystone v2 does not allow user creation by non-admin
631        # users, so we can throw an error to the user that does not need to
632        # mention api versions
633        if self._is_client_version('identity', 3):
634            if not domain_id:
635                raise exc.OpenStackCloudException(
636                    "User or project creation requires an explicit"
637                    " domain_id argument.")
638            else:
639                return {'domain_id': domain_id}
640        else:
641            return {}
642
643    def _get_identity_params(self, domain_id=None, project=None):
644        """Get the domain and project/tenant parameters if needed.
645
646        keystone v2 and v3 are divergent enough that we need to pass or not
647        pass project or tenant_id or domain or nothing in a sane manner.
648        """
649        ret = {}
650        ret.update(self._get_domain_id_param_dict(domain_id))
651        ret.update(self._get_project_id_param_dict(project))
652        return ret
653
654    def range_search(self, data, filters):
655        """Perform integer range searches across a list of dictionaries.
656
657        Given a list of dictionaries, search across the list using the given
658        dictionary keys and a range of integer values for each key. Only
659        dictionaries that match ALL search filters across the entire original
660        data set will be returned.
661
662        It is not a requirement that each dictionary contain the key used
663        for searching. Those without the key will be considered non-matching.
664
665        The range values must be string values and is either a set of digits
666        representing an integer for matching, or a range operator followed by
667        a set of digits representing an integer for matching. If a range
668        operator is not given, exact value matching will be used. Valid
669        operators are one of: <,>,<=,>=
670
671        :param data: List of dictionaries to be searched.
672        :param filters: Dict describing the one or more range searches to
673            perform. If more than one search is given, the result will be the
674            members of the original data set that match ALL searches. An
675            example of filtering by multiple ranges::
676
677                {"vcpus": "<=5", "ram": "<=2048", "disk": "1"}
678
679        :returns: A list subset of the original data set.
680        :raises: OpenStackCloudException on invalid range expressions.
681        """
682        filtered = []
683
684        for key, range_value in filters.items():
685            # We always want to operate on the full data set so that
686            # calculations for minimum and maximum are correct.
687            results = _utils.range_filter(data, key, range_value)
688
689            if not filtered:
690                # First set of results
691                filtered = results
692            else:
693                # The combination of all searches should be the intersection of
694                # all result sets from each search. So adjust the current set
695                # of filtered data by computing its intersection with the
696                # latest result set.
697                filtered = [r for r in results for f in filtered if r == f]
698
699        return filtered
700
701    def _get_and_munchify(self, key, data):
702        """Wrapper around meta.get_and_munchify.
703
704        Some of the methods expect a `meta` attribute to be passed in as
705        part of the method signature. In those methods the meta param is
706        overriding the meta module making the call to meta.get_and_munchify
707        to fail.
708        """
709        if isinstance(data, requests.models.Response):
710            data = proxy._json_response(data)
711        return meta.get_and_munchify(key, data)
712
713    def get_name(self):
714        return self.name
715
716    def get_session_endpoint(self, service_key, **kwargs):
717        if not kwargs:
718            kwargs = {}
719        try:
720            return self.config.get_session_endpoint(service_key, **kwargs)
721        except keystoneauth1.exceptions.catalog.EndpointNotFound as e:
722            self.log.debug(
723                "Endpoint not found in %s cloud: %s", self.name, str(e))
724            endpoint = None
725        except exc.OpenStackCloudException:
726            raise
727        except Exception as e:
728            raise exc.OpenStackCloudException(
729                "Error getting {service} endpoint on {cloud}:{region}:"
730                " {error}".format(
731                    service=service_key,
732                    cloud=self.name,
733                    region=self.config.get_region_name(service_key),
734                    error=str(e)))
735        return endpoint
736
737    def has_service(self, service_key, version=None):
738        if not self.config.has_service(service_key):
739            # TODO(mordred) add a stamp here so that we only report this once
740            if not (service_key in self._disable_warnings
741                    and self._disable_warnings[service_key]):
742                self.log.debug(
743                    "Disabling %(service_key)s entry in catalog"
744                    " per config", {'service_key': service_key})
745                self._disable_warnings[service_key] = True
746            return False
747        try:
748            kwargs = dict()
749            # If a specific version was requested - try it
750            if version is not None:
751                kwargs['min_version'] = version
752                kwargs['max_version'] = version
753            endpoint = self.get_session_endpoint(service_key, **kwargs)
754        except exc.OpenStackCloudException:
755            return False
756        if endpoint:
757            return True
758        else:
759            return False
760
761    def project_cleanup(
762        self,
763        dry_run=True,
764        wait_timeout=120,
765        status_queue=None,
766        filters=None,
767        resource_evaluation_fn=None
768    ):
769        """Cleanup the project resources.
770
771        Cleanup all resources in all services, which provide cleanup methods.
772
773        :param bool dry_run: Cleanup or only list identified resources.
774        :param int wait_timeout: Maximum amount of time given to each service
775            to comlete the cleanup.
776        :param queue status_queue: a threading queue object used to get current
777            process status. The queue contain processed resources.
778        :param dict filters: Additional filters for the cleanup (only resources
779            matching all filters will be deleted, if there are no other
780            dependencies).
781        :param resource_evaluation_fn: A callback function, which will be
782            invoked for each resurce and must return True/False depending on
783            whether resource need to be deleted or not.
784        """
785        dependencies = {}
786        get_dep_fn_name = '_get_cleanup_dependencies'
787        cleanup_fn_name = '_service_cleanup'
788        if not status_queue:
789            status_queue = queue.Queue()
790        for service in self.config.get_enabled_services():
791            if hasattr(self, service):
792                proxy = getattr(self, service)
793                if (
794                    proxy
795                    and hasattr(proxy, get_dep_fn_name)
796                    and hasattr(proxy, cleanup_fn_name)
797                ):
798                    deps = getattr(proxy, get_dep_fn_name)()
799                    if deps:
800                        dependencies.update(deps)
801        dep_graph = utils.TinyDAG()
802        for k, v in dependencies.items():
803            dep_graph.add_node(k)
804            for dep in v['before']:
805                dep_graph.add_node(dep)
806                dep_graph.add_edge(k, dep)
807            for dep in v.get('after', []):
808                dep_graph.add_edge(dep, k)
809
810        cleanup_resources = dict()
811
812        for service in dep_graph.walk(timeout=wait_timeout):
813            fn = None
814            if hasattr(self, service):
815                proxy = getattr(self, service)
816                cleanup_fn = getattr(proxy, cleanup_fn_name, None)
817                if cleanup_fn:
818                    fn = functools.partial(
819                        cleanup_fn,
820                        dry_run=dry_run,
821                        client_status_queue=status_queue,
822                        identified_resources=cleanup_resources,
823                        filters=filters,
824                        resource_evaluation_fn=resource_evaluation_fn
825                    )
826            if fn:
827                self._pool_executor.submit(
828                    cleanup_task, dep_graph, service, fn
829                )
830            else:
831                dep_graph.node_done(service)
832
833        for count in utils.iterate_timeout(
834                timeout=wait_timeout,
835                message="Timeout waiting for cleanup to finish",
836                wait=1):
837            if dep_graph.is_complete():
838                return
839
840
841def cleanup_task(graph, service, fn):
842    try:
843        fn()
844    except Exception:
845        log = _log.setup_logging('openstack.project_cleanup')
846        log.exception('Error in the %s cleanup function' % service)
847    finally:
848        graph.node_done(service)
849