1# Licensed under the Apache License, Version 2.0 (the "License"); 2# you may not use this file except in compliance with the License. 3# You may obtain a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, 9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 10# See the License for the specific language governing permissions and 11# limitations under the License. 12import copy 13import functools 14import queue 15# import types so that we can reference ListType in sphinx param declarations. 16# We can't just use list, because sphinx gets confused by 17# openstack.resource.Resource.list and openstack.resource2.Resource.list 18import types # noqa 19import warnings 20 21import dogpile.cache 22import keystoneauth1.exceptions 23import keystoneauth1.session 24import munch 25import requests.models 26import requestsexceptions 27 28from openstack import _log 29from openstack.cloud import _floating_ip 30from openstack.cloud import _object_store 31from openstack.cloud import _utils 32from openstack.cloud import exc 33from openstack.cloud import meta 34import openstack.config 35from openstack.config import cloud_region as cloud_region_mod 36from openstack import proxy 37from openstack import utils 38 39 40DEFAULT_SERVER_AGE = 5 41DEFAULT_PORT_AGE = 5 42DEFAULT_FLOAT_AGE = 5 43_CONFIG_DOC_URL = _floating_ip._CONFIG_DOC_URL 44DEFAULT_OBJECT_SEGMENT_SIZE = _object_store.DEFAULT_OBJECT_SEGMENT_SIZE 45# This halves the current default for Swift 46DEFAULT_MAX_FILE_SIZE = _object_store.DEFAULT_MAX_FILE_SIZE 47OBJECT_CONTAINER_ACLS = _object_store.OBJECT_CONTAINER_ACLS 48 49 50class _OpenStackCloudMixin: 51 """Represent a connection to an OpenStack Cloud. 52 53 OpenStackCloud is the entry point for all cloud operations, regardless 54 of which OpenStack service those operations may ultimately come from. 55 The operations on an OpenStackCloud are resource oriented rather than 56 REST API operation oriented. For instance, one will request a Floating IP 57 and that Floating IP will be actualized either via neutron or via nova 58 depending on how this particular cloud has decided to arrange itself. 59 60 :param bool strict: Only return documented attributes for each resource 61 as per the Data Model contract. (Default False) 62 """ 63 _OBJECT_MD5_KEY = 'x-object-meta-x-sdk-md5' 64 _OBJECT_SHA256_KEY = 'x-object-meta-x-sdk-sha256' 65 _OBJECT_AUTOCREATE_KEY = 'x-object-meta-x-sdk-autocreated' 66 _OBJECT_AUTOCREATE_CONTAINER = 'images' 67 68 # NOTE(shade) shade keys were x-object-meta-x-shade-md5 - we need to check 69 # those in freshness checks so that a shade->sdk transition 70 # doesn't result in a re-upload 71 _SHADE_OBJECT_MD5_KEY = 'x-object-meta-x-shade-md5' 72 _SHADE_OBJECT_SHA256_KEY = 'x-object-meta-x-shade-sha256' 73 _SHADE_OBJECT_AUTOCREATE_KEY = 'x-object-meta-x-shade-autocreated' 74 75 def __init__(self): 76 77 super(_OpenStackCloudMixin, self).__init__() 78 79 self.log = _log.setup_logging('openstack') 80 81 self.name = self.config.name 82 self.auth = self.config.get_auth_args() 83 self.default_interface = self.config.get_interface() 84 self.force_ipv4 = self.config.force_ipv4 85 86 (self.verify, self.cert) = self.config.get_requests_verify_args() 87 88 # Turn off urllib3 warnings about insecure certs if we have 89 # explicitly configured requests to tell it we do not want 90 # cert verification 91 if not self.verify: 92 self.log.debug( 93 "Turning off Insecure SSL warnings since verify=False") 94 category = requestsexceptions.InsecureRequestWarning 95 if category: 96 # InsecureRequestWarning references a Warning class or is None 97 warnings.filterwarnings('ignore', category=category) 98 99 self._disable_warnings = {} 100 101 cache_expiration_time = int(self.config.get_cache_expiration_time()) 102 cache_class = self.config.get_cache_class() 103 cache_arguments = self.config.get_cache_arguments() 104 105 self._resource_caches = {} 106 107 if cache_class != 'dogpile.cache.null': 108 self.cache_enabled = True 109 self._cache = self._make_cache( 110 cache_class, cache_expiration_time, cache_arguments) 111 expirations = self.config.get_cache_expirations() 112 for expire_key in expirations.keys(): 113 # Only build caches for things we have list operations for 114 if getattr( 115 self, 'list_{0}'.format(expire_key), None): 116 self._resource_caches[expire_key] = self._make_cache( 117 cache_class, expirations[expire_key], cache_arguments) 118 119 self._SERVER_AGE = DEFAULT_SERVER_AGE 120 self._PORT_AGE = DEFAULT_PORT_AGE 121 self._FLOAT_AGE = DEFAULT_FLOAT_AGE 122 else: 123 self.cache_enabled = False 124 125 def _fake_invalidate(unused): 126 pass 127 128 class _FakeCache: 129 def invalidate(self): 130 pass 131 132 # Don't cache list_servers if we're not caching things. 133 # Replace this with a more specific cache configuration 134 # soon. 135 self._SERVER_AGE = 0 136 self._PORT_AGE = 0 137 self._FLOAT_AGE = 0 138 self._cache = _FakeCache() 139 # Undecorate cache decorated methods. Otherwise the call stacks 140 # wind up being stupidly long and hard to debug 141 for method in _utils._decorated_methods: 142 meth_obj = getattr(self, method, None) 143 if not meth_obj: 144 continue 145 if (hasattr(meth_obj, 'invalidate') 146 and hasattr(meth_obj, 'func')): 147 new_func = functools.partial(meth_obj.func, self) 148 new_func.invalidate = _fake_invalidate 149 setattr(self, method, new_func) 150 151 # If server expiration time is set explicitly, use that. Otherwise 152 # fall back to whatever it was before 153 self._SERVER_AGE = self.config.get_cache_resource_expiration( 154 'server', self._SERVER_AGE) 155 self._PORT_AGE = self.config.get_cache_resource_expiration( 156 'port', self._PORT_AGE) 157 self._FLOAT_AGE = self.config.get_cache_resource_expiration( 158 'floating_ip', self._FLOAT_AGE) 159 160 self._container_cache = dict() 161 self._file_hash_cache = dict() 162 163 # self.__pool_executor = None 164 165 self._raw_clients = {} 166 167 self._local_ipv6 = ( 168 _utils.localhost_supports_ipv6() if not self.force_ipv4 else False) 169 170 def connect_as(self, **kwargs): 171 """Make a new OpenStackCloud object with new auth context. 172 173 Take the existing settings from the current cloud and construct a new 174 OpenStackCloud object with some of the auth settings overridden. This 175 is useful for getting an object to perform tasks with as another user, 176 or in the context of a different project. 177 178 .. code-block:: python 179 180 conn = openstack.connect(cloud='example') 181 # Work normally 182 servers = conn.list_servers() 183 conn2 = conn.connect_as(username='different-user', password='') 184 # Work as different-user 185 servers = conn2.list_servers() 186 187 :param kwargs: keyword arguments can contain anything that would 188 normally go in an auth dict. They will override the same 189 settings from the parent cloud as appropriate. Entries 190 that do not want to be overridden can be ommitted. 191 """ 192 193 if self.config._openstack_config: 194 config = self.config._openstack_config 195 else: 196 # TODO(mordred) Replace this with from_session 197 config = openstack.config.OpenStackConfig( 198 app_name=self.config._app_name, 199 app_version=self.config._app_version, 200 load_yaml_config=False) 201 params = copy.deepcopy(self.config.config) 202 # Remove profile from current cloud so that overridding works 203 params.pop('profile', None) 204 205 # Utility function to help with the stripping below. 206 def pop_keys(params, auth, name_key, id_key): 207 if name_key in auth or id_key in auth: 208 params['auth'].pop(name_key, None) 209 params['auth'].pop(id_key, None) 210 211 # If there are user, project or domain settings in the incoming auth 212 # dict, strip out both id and name so that a user can say: 213 # cloud.connect_as(project_name='foo') 214 # and have that work with clouds that have a project_id set in their 215 # config. 216 for prefix in ('user', 'project'): 217 if prefix == 'user': 218 name_key = 'username' 219 else: 220 name_key = 'project_name' 221 id_key = '{prefix}_id'.format(prefix=prefix) 222 pop_keys(params, kwargs, name_key, id_key) 223 id_key = '{prefix}_domain_id'.format(prefix=prefix) 224 name_key = '{prefix}_domain_name'.format(prefix=prefix) 225 pop_keys(params, kwargs, name_key, id_key) 226 227 for key, value in kwargs.items(): 228 params['auth'][key] = value 229 230 cloud_region = config.get_one(**params) 231 # Attach the discovery cache from the old session so we won't 232 # double discover. 233 cloud_region._discovery_cache = self.session._discovery_cache 234 # Override the cloud name so that logging/location work right 235 cloud_region._name = self.name 236 cloud_region.config['profile'] = self.name 237 # Use self.__class__ so that we return whatever this if, like if it's 238 # a subclass in the case of shade wrapping sdk. 239 return self.__class__(config=cloud_region) 240 241 def connect_as_project(self, project): 242 """Make a new OpenStackCloud object with a new project. 243 244 Take the existing settings from the current cloud and construct a new 245 OpenStackCloud object with the project settings overridden. This 246 is useful for getting an object to perform tasks with as another user, 247 or in the context of a different project. 248 249 .. code-block:: python 250 251 cloud = openstack.connect(cloud='example') 252 # Work normally 253 servers = cloud.list_servers() 254 cloud2 = cloud.connect_as_project('different-project') 255 # Work in different-project 256 servers = cloud2.list_servers() 257 258 :param project: Either a project name or a project dict as returned by 259 `list_projects`. 260 """ 261 auth = {} 262 if isinstance(project, dict): 263 auth['project_id'] = project.get('id') 264 auth['project_name'] = project.get('name') 265 if project.get('domain_id'): 266 auth['project_domain_id'] = project['domain_id'] 267 else: 268 auth['project_name'] = project 269 return self.connect_as(**auth) 270 271 def global_request(self, global_request_id): 272 """Make a new Connection object with a global request id set. 273 274 Take the existing settings from the current Connection and construct a 275 new Connection object with the global_request_id overridden. 276 277 .. code-block:: python 278 279 from oslo_context import context 280 cloud = openstack.connect(cloud='example') 281 # Work normally 282 servers = cloud.list_servers() 283 cloud2 = cloud.global_request(context.generate_request_id()) 284 # cloud2 sends all requests with global_request_id set 285 servers = cloud2.list_servers() 286 287 Additionally, this can be used as a context manager: 288 289 .. code-block:: python 290 291 from oslo_context import context 292 c = openstack.connect(cloud='example') 293 # Work normally 294 servers = c.list_servers() 295 with c.global_request(context.generate_request_id()) as c2: 296 # c2 sends all requests with global_request_id set 297 servers = c2.list_servers() 298 299 :param global_request_id: The `global_request_id` to send. 300 """ 301 params = copy.deepcopy(self.config.config) 302 cloud_region = cloud_region_mod.from_session( 303 session=self.session, 304 app_name=self.config._app_name, 305 app_version=self.config._app_version, 306 discovery_cache=self.session._discovery_cache, 307 **params) 308 309 # Override the cloud name so that logging/location work right 310 cloud_region._name = self.name 311 cloud_region.config['profile'] = self.name 312 # Use self.__class__ so that we return whatever this is, like if it's 313 # a subclass in the case of shade wrapping sdk. 314 new_conn = self.__class__(config=cloud_region) 315 new_conn.set_global_request_id(global_request_id) 316 return new_conn 317 318 def _make_cache(self, cache_class, expiration_time, arguments): 319 return dogpile.cache.make_region( 320 function_key_generator=self._make_cache_key 321 ).configure( 322 cache_class, 323 expiration_time=expiration_time, 324 arguments=arguments) 325 326 def _make_cache_key(self, namespace, fn): 327 fname = fn.__name__ 328 if namespace is None: 329 name_key = self.name 330 else: 331 name_key = '%s:%s' % (self.name, namespace) 332 333 def generate_key(*args, **kwargs): 334 # TODO(frickler): make handling arg keys actually work 335 arg_key = '' 336 kw_keys = sorted(kwargs.keys()) 337 kwargs_key = ','.join( 338 ['%s:%s' % (k, kwargs[k]) for k in kw_keys if k != 'cache']) 339 ans = "_".join( 340 [str(name_key), fname, arg_key, kwargs_key]) 341 return ans 342 return generate_key 343 344 def _get_cache(self, resource_name): 345 if resource_name and resource_name in self._resource_caches: 346 return self._resource_caches[resource_name] 347 else: 348 return self._cache 349 350 def _get_major_version_id(self, version): 351 if isinstance(version, int): 352 return version 353 elif isinstance(version, (str, tuple)): 354 return int(version[0]) 355 return version 356 357 def _get_versioned_client( 358 self, service_type, min_version=None, max_version=None): 359 config_version = self.config.get_api_version(service_type) 360 config_major = self._get_major_version_id(config_version) 361 max_major = self._get_major_version_id(max_version) 362 min_major = self._get_major_version_id(min_version) 363 # TODO(shade) This should be replaced with use of Connection. However, 364 # we need to find a sane way to deal with this additional 365 # logic - or we need to give up on it. If we give up on it, 366 # we need to make sure we can still support it in the shade 367 # compat layer. 368 # NOTE(mordred) This logic for versions is slightly different 369 # than the ksa Adapter constructor logic. openstack.cloud knows the 370 # versions it knows, and uses them when it detects them. However, if 371 # a user requests a version, and it's not found, and a different one 372 # openstack.cloud does know about is found, that's a warning in 373 # openstack.cloud. 374 if config_version: 375 if min_major and config_major < min_major: 376 raise exc.OpenStackCloudException( 377 "Version {config_version} requested for {service_type}" 378 " but shade understands a minimum of {min_version}".format( 379 config_version=config_version, 380 service_type=service_type, 381 min_version=min_version)) 382 elif max_major and config_major > max_major: 383 raise exc.OpenStackCloudException( 384 "Version {config_version} requested for {service_type}" 385 " but openstack.cloud understands a maximum of" 386 " {max_version}".format( 387 config_version=config_version, 388 service_type=service_type, 389 max_version=max_version)) 390 request_min_version = config_version 391 request_max_version = '{version}.latest'.format( 392 version=config_major) 393 adapter = proxy._ShadeAdapter( 394 session=self.session, 395 service_type=self.config.get_service_type(service_type), 396 service_name=self.config.get_service_name(service_type), 397 interface=self.config.get_interface(service_type), 398 endpoint_override=self.config.get_endpoint(service_type), 399 region_name=self.config.get_region_name(service_type), 400 statsd_prefix=self.config.get_statsd_prefix(), 401 statsd_client=self.config.get_statsd_client(), 402 prometheus_counter=self.config.get_prometheus_counter(), 403 prometheus_histogram=self.config.get_prometheus_histogram(), 404 influxdb_client=self.config.get_influxdb_client(), 405 min_version=request_min_version, 406 max_version=request_max_version) 407 if adapter.get_endpoint(): 408 return adapter 409 410 adapter = proxy._ShadeAdapter( 411 session=self.session, 412 service_type=self.config.get_service_type(service_type), 413 service_name=self.config.get_service_name(service_type), 414 interface=self.config.get_interface(service_type), 415 endpoint_override=self.config.get_endpoint(service_type), 416 region_name=self.config.get_region_name(service_type), 417 min_version=min_version, 418 max_version=max_version) 419 420 # data.api_version can be None if no version was detected, such 421 # as with neutron 422 api_version = adapter.get_api_major_version( 423 endpoint_override=self.config.get_endpoint(service_type)) 424 api_major = self._get_major_version_id(api_version) 425 426 # If we detect a different version that was configured, warn the user. 427 # shade still knows what to do - but if the user gave us an explicit 428 # version and we couldn't find it, they may want to investigate. 429 if api_version and config_version and (api_major != config_major): 430 warning_msg = ( 431 '{service_type} is configured for {config_version}' 432 ' but only {api_version} is available. shade is happy' 433 ' with this version, but if you were trying to force an' 434 ' override, that did not happen. You may want to check' 435 ' your cloud, or remove the version specification from' 436 ' your config.'.format( 437 service_type=service_type, 438 config_version=config_version, 439 api_version='.'.join([str(f) for f in api_version]))) 440 self.log.debug(warning_msg) 441 warnings.warn(warning_msg) 442 return adapter 443 444 # TODO(shade) This should be replaced with using openstack Connection 445 # object. 446 def _get_raw_client( 447 self, service_type, api_version=None, endpoint_override=None): 448 return proxy._ShadeAdapter( 449 session=self.session, 450 service_type=self.config.get_service_type(service_type), 451 service_name=self.config.get_service_name(service_type), 452 interface=self.config.get_interface(service_type), 453 endpoint_override=self.config.get_endpoint( 454 service_type) or endpoint_override, 455 region_name=self.config.get_region_name(service_type)) 456 457 def _is_client_version(self, client, version): 458 client_name = '_{client}_client'.format( 459 client=client.replace('-', '_')) 460 client = getattr(self, client_name) 461 return client._version_matches(version) 462 463 @property 464 def _application_catalog_client(self): 465 if 'application-catalog' not in self._raw_clients: 466 self._raw_clients['application-catalog'] = self._get_raw_client( 467 'application-catalog') 468 return self._raw_clients['application-catalog'] 469 470 @property 471 def _database_client(self): 472 if 'database' not in self._raw_clients: 473 self._raw_clients['database'] = self._get_raw_client('database') 474 return self._raw_clients['database'] 475 476 @property 477 def _raw_image_client(self): 478 if 'raw-image' not in self._raw_clients: 479 image_client = self._get_raw_client('image') 480 self._raw_clients['raw-image'] = image_client 481 return self._raw_clients['raw-image'] 482 483 def pprint(self, resource): 484 """Wrapper around pprint that groks munch objects""" 485 # import late since this is a utility function 486 import pprint 487 new_resource = _utils._dictify_resource(resource) 488 pprint.pprint(new_resource) 489 490 def pformat(self, resource): 491 """Wrapper around pformat that groks munch objects""" 492 # import late since this is a utility function 493 import pprint 494 new_resource = _utils._dictify_resource(resource) 495 return pprint.pformat(new_resource) 496 497 @property 498 def _keystone_catalog(self): 499 return self.session.auth.get_access(self.session).service_catalog 500 501 @property 502 def service_catalog(self): 503 return self._keystone_catalog.catalog 504 505 def endpoint_for(self, service_type, interface=None, region_name=None): 506 """Return the endpoint for a given service. 507 508 Respects config values for Connection, including 509 ``*_endpoint_override``. For direct values from the catalog 510 regardless of overrides, see 511 :meth:`~openstack.config.cloud_region.CloudRegion.get_endpoint_from_catalog` 512 513 :param service_type: Service Type of the endpoint to search for. 514 :param interface: 515 Interface of the endpoint to search for. Optional, defaults to 516 the configured value for interface for this Connection. 517 :param region_name: 518 Region Name of the endpoint to search for. Optional, defaults to 519 the configured value for region_name for this Connection. 520 521 :returns: The endpoint of the service, or None if not found. 522 """ 523 524 endpoint_override = self.config.get_endpoint(service_type) 525 if endpoint_override: 526 return endpoint_override 527 return self.config.get_endpoint_from_catalog( 528 service_type=service_type, 529 interface=interface, 530 region_name=region_name) 531 532 @property 533 def auth_token(self): 534 # Keystone's session will reuse a token if it is still valid. 535 # We don't need to track validity here, just get_token() each time. 536 return self.session.get_token() 537 538 @property 539 def current_user_id(self): 540 """Get the id of the currently logged-in user from the token.""" 541 return self.session.auth.get_access(self.session).user_id 542 543 @property 544 def current_project_id(self): 545 """Get the current project ID. 546 547 Returns the project_id of the current token scope. None means that 548 the token is domain scoped or unscoped. 549 550 :raises keystoneauth1.exceptions.auth.AuthorizationFailure: 551 if a new token fetch fails. 552 :raises keystoneauth1.exceptions.auth_plugins.MissingAuthPlugin: 553 if a plugin is not available. 554 """ 555 return self.session.get_project_id() 556 557 @property 558 def current_project(self): 559 """Return a ``munch.Munch`` describing the current project""" 560 return self._get_project_info() 561 562 def _get_project_info(self, project_id=None): 563 project_info = munch.Munch( 564 id=project_id, 565 name=None, 566 domain_id=None, 567 domain_name=None, 568 ) 569 if not project_id or project_id == self.current_project_id: 570 # If we don't have a project_id parameter, it means a user is 571 # directly asking what the current state is. 572 # Alternately, if we have one, that means we're calling this 573 # from within a normalize function, which means the object has 574 # a project_id associated with it. If the project_id matches 575 # the project_id of our current token, that means we can supplement 576 # the info with human readable info about names if we have them. 577 # If they don't match, that means we're an admin who has pulled 578 # an object from a different project, so adding info from the 579 # current token would be wrong. 580 auth_args = self.config.config.get('auth', {}) 581 project_info['id'] = self.current_project_id 582 project_info['name'] = auth_args.get('project_name') 583 project_info['domain_id'] = auth_args.get('project_domain_id') 584 project_info['domain_name'] = auth_args.get('project_domain_name') 585 return project_info 586 587 @property 588 def current_location(self): 589 """Return a ``munch.Munch`` explaining the current cloud location.""" 590 return self._get_current_location() 591 592 def _get_current_location(self, project_id=None, zone=None): 593 return munch.Munch( 594 cloud=self.name, 595 # TODO(efried): This is wrong, but it only seems to be used in a 596 # repr; can we get rid of it? 597 region_name=self.config.get_region_name(), 598 zone=zone, 599 project=self._get_project_info(project_id), 600 ) 601 602 def _get_identity_location(self): 603 '''Identity resources do not exist inside of projects.''' 604 return munch.Munch( 605 cloud=self.name, 606 region_name=None, 607 zone=None, 608 project=munch.Munch( 609 id=None, 610 name=None, 611 domain_id=None, 612 domain_name=None)) 613 614 def _get_project_id_param_dict(self, name_or_id): 615 if name_or_id: 616 project = self.get_project(name_or_id) 617 if not project: 618 return {} 619 if self._is_client_version('identity', 3): 620 return {'default_project_id': project['id']} 621 else: 622 return {'tenant_id': project['id']} 623 else: 624 return {} 625 626 def _get_domain_id_param_dict(self, domain_id): 627 """Get a useable domain.""" 628 629 # Keystone v3 requires domains for user and project creation. v2 does 630 # not. However, keystone v2 does not allow user creation by non-admin 631 # users, so we can throw an error to the user that does not need to 632 # mention api versions 633 if self._is_client_version('identity', 3): 634 if not domain_id: 635 raise exc.OpenStackCloudException( 636 "User or project creation requires an explicit" 637 " domain_id argument.") 638 else: 639 return {'domain_id': domain_id} 640 else: 641 return {} 642 643 def _get_identity_params(self, domain_id=None, project=None): 644 """Get the domain and project/tenant parameters if needed. 645 646 keystone v2 and v3 are divergent enough that we need to pass or not 647 pass project or tenant_id or domain or nothing in a sane manner. 648 """ 649 ret = {} 650 ret.update(self._get_domain_id_param_dict(domain_id)) 651 ret.update(self._get_project_id_param_dict(project)) 652 return ret 653 654 def range_search(self, data, filters): 655 """Perform integer range searches across a list of dictionaries. 656 657 Given a list of dictionaries, search across the list using the given 658 dictionary keys and a range of integer values for each key. Only 659 dictionaries that match ALL search filters across the entire original 660 data set will be returned. 661 662 It is not a requirement that each dictionary contain the key used 663 for searching. Those without the key will be considered non-matching. 664 665 The range values must be string values and is either a set of digits 666 representing an integer for matching, or a range operator followed by 667 a set of digits representing an integer for matching. If a range 668 operator is not given, exact value matching will be used. Valid 669 operators are one of: <,>,<=,>= 670 671 :param data: List of dictionaries to be searched. 672 :param filters: Dict describing the one or more range searches to 673 perform. If more than one search is given, the result will be the 674 members of the original data set that match ALL searches. An 675 example of filtering by multiple ranges:: 676 677 {"vcpus": "<=5", "ram": "<=2048", "disk": "1"} 678 679 :returns: A list subset of the original data set. 680 :raises: OpenStackCloudException on invalid range expressions. 681 """ 682 filtered = [] 683 684 for key, range_value in filters.items(): 685 # We always want to operate on the full data set so that 686 # calculations for minimum and maximum are correct. 687 results = _utils.range_filter(data, key, range_value) 688 689 if not filtered: 690 # First set of results 691 filtered = results 692 else: 693 # The combination of all searches should be the intersection of 694 # all result sets from each search. So adjust the current set 695 # of filtered data by computing its intersection with the 696 # latest result set. 697 filtered = [r for r in results for f in filtered if r == f] 698 699 return filtered 700 701 def _get_and_munchify(self, key, data): 702 """Wrapper around meta.get_and_munchify. 703 704 Some of the methods expect a `meta` attribute to be passed in as 705 part of the method signature. In those methods the meta param is 706 overriding the meta module making the call to meta.get_and_munchify 707 to fail. 708 """ 709 if isinstance(data, requests.models.Response): 710 data = proxy._json_response(data) 711 return meta.get_and_munchify(key, data) 712 713 def get_name(self): 714 return self.name 715 716 def get_session_endpoint(self, service_key, **kwargs): 717 if not kwargs: 718 kwargs = {} 719 try: 720 return self.config.get_session_endpoint(service_key, **kwargs) 721 except keystoneauth1.exceptions.catalog.EndpointNotFound as e: 722 self.log.debug( 723 "Endpoint not found in %s cloud: %s", self.name, str(e)) 724 endpoint = None 725 except exc.OpenStackCloudException: 726 raise 727 except Exception as e: 728 raise exc.OpenStackCloudException( 729 "Error getting {service} endpoint on {cloud}:{region}:" 730 " {error}".format( 731 service=service_key, 732 cloud=self.name, 733 region=self.config.get_region_name(service_key), 734 error=str(e))) 735 return endpoint 736 737 def has_service(self, service_key, version=None): 738 if not self.config.has_service(service_key): 739 # TODO(mordred) add a stamp here so that we only report this once 740 if not (service_key in self._disable_warnings 741 and self._disable_warnings[service_key]): 742 self.log.debug( 743 "Disabling %(service_key)s entry in catalog" 744 " per config", {'service_key': service_key}) 745 self._disable_warnings[service_key] = True 746 return False 747 try: 748 kwargs = dict() 749 # If a specific version was requested - try it 750 if version is not None: 751 kwargs['min_version'] = version 752 kwargs['max_version'] = version 753 endpoint = self.get_session_endpoint(service_key, **kwargs) 754 except exc.OpenStackCloudException: 755 return False 756 if endpoint: 757 return True 758 else: 759 return False 760 761 def project_cleanup( 762 self, 763 dry_run=True, 764 wait_timeout=120, 765 status_queue=None, 766 filters=None, 767 resource_evaluation_fn=None 768 ): 769 """Cleanup the project resources. 770 771 Cleanup all resources in all services, which provide cleanup methods. 772 773 :param bool dry_run: Cleanup or only list identified resources. 774 :param int wait_timeout: Maximum amount of time given to each service 775 to comlete the cleanup. 776 :param queue status_queue: a threading queue object used to get current 777 process status. The queue contain processed resources. 778 :param dict filters: Additional filters for the cleanup (only resources 779 matching all filters will be deleted, if there are no other 780 dependencies). 781 :param resource_evaluation_fn: A callback function, which will be 782 invoked for each resurce and must return True/False depending on 783 whether resource need to be deleted or not. 784 """ 785 dependencies = {} 786 get_dep_fn_name = '_get_cleanup_dependencies' 787 cleanup_fn_name = '_service_cleanup' 788 if not status_queue: 789 status_queue = queue.Queue() 790 for service in self.config.get_enabled_services(): 791 if hasattr(self, service): 792 proxy = getattr(self, service) 793 if ( 794 proxy 795 and hasattr(proxy, get_dep_fn_name) 796 and hasattr(proxy, cleanup_fn_name) 797 ): 798 deps = getattr(proxy, get_dep_fn_name)() 799 if deps: 800 dependencies.update(deps) 801 dep_graph = utils.TinyDAG() 802 for k, v in dependencies.items(): 803 dep_graph.add_node(k) 804 for dep in v['before']: 805 dep_graph.add_node(dep) 806 dep_graph.add_edge(k, dep) 807 for dep in v.get('after', []): 808 dep_graph.add_edge(dep, k) 809 810 cleanup_resources = dict() 811 812 for service in dep_graph.walk(timeout=wait_timeout): 813 fn = None 814 if hasattr(self, service): 815 proxy = getattr(self, service) 816 cleanup_fn = getattr(proxy, cleanup_fn_name, None) 817 if cleanup_fn: 818 fn = functools.partial( 819 cleanup_fn, 820 dry_run=dry_run, 821 client_status_queue=status_queue, 822 identified_resources=cleanup_resources, 823 filters=filters, 824 resource_evaluation_fn=resource_evaluation_fn 825 ) 826 if fn: 827 self._pool_executor.submit( 828 cleanup_task, dep_graph, service, fn 829 ) 830 else: 831 dep_graph.node_done(service) 832 833 for count in utils.iterate_timeout( 834 timeout=wait_timeout, 835 message="Timeout waiting for cleanup to finish", 836 wait=1): 837 if dep_graph.is_complete(): 838 return 839 840 841def cleanup_task(graph, service, fn): 842 try: 843 fn() 844 except Exception: 845 log = _log.setup_logging('openstack.project_cleanup') 846 log.exception('Error in the %s cleanup function' % service) 847 finally: 848 graph.node_done(service) 849