1# Licensed under the Apache License, Version 2.0 (the "License"); you may 2# not use this file except in compliance with the License. You may obtain 3# a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# 7# Unless required by applicable law or agreed to in writing, software 8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10# License for the specific language governing permissions and limitations 11# under the License. 12 13""" 14test_project_cleanup 15---------------------------------- 16 17Functional tests for project cleanup methods. 18""" 19import queue 20 21from openstack.tests.functional import base 22 23 24class TestProjectCleanup(base.BaseFunctionalTest): 25 26 _wait_for_timeout_key = 'OPENSTACKSDK_FUNC_TEST_TIMEOUT_CLEANUP' 27 28 def setUp(self): 29 super(TestProjectCleanup, self).setUp() 30 self.conn = self.user_cloud_alt 31 self.network_name = self.getUniqueString('network') 32 33 def _create_network_resources(self): 34 conn = self.conn 35 self.net = conn.network.create_network( 36 name=self.network_name, 37 ) 38 self.subnet = conn.network.create_subnet( 39 name=self.getUniqueString('subnet'), 40 network_id=self.net.id, 41 cidr='192.169.1.0/24', 42 ip_version=4, 43 ) 44 self.router = conn.network.create_router( 45 name=self.getUniqueString('router') 46 ) 47 conn.network.add_interface_to_router( 48 self.router.id, 49 subnet_id=self.subnet.id) 50 51 def test_cleanup(self): 52 self._create_network_resources() 53 status_queue = queue.Queue() 54 55 # First round - check no resources are old enough 56 self.conn.project_cleanup( 57 dry_run=True, 58 wait_timeout=120, 59 status_queue=status_queue, 60 filters={'created_at': '2000-01-01'}) 61 62 self.assertTrue(status_queue.empty()) 63 64 # Second round - resource evaluation function return false, ensure 65 # nothing identified 66 self.conn.project_cleanup( 67 dry_run=True, 68 wait_timeout=120, 69 status_queue=status_queue, 70 filters={'created_at': '2200-01-01'}, 71 resource_evaluation_fn=lambda x, y, z: False) 72 73 self.assertTrue(status_queue.empty()) 74 75 # Third round - filters set too low 76 self.conn.project_cleanup( 77 dry_run=True, 78 wait_timeout=120, 79 status_queue=status_queue, 80 filters={'created_at': '2200-01-01'}) 81 82 objects = [] 83 while not status_queue.empty(): 84 objects.append(status_queue.get()) 85 86 # At least known networks should be identified 87 net_names = list(obj.name for obj in objects) 88 self.assertIn(self.network_name, net_names) 89 90 # Fourth round - dry run with no filters, ensure everything identified 91 self.conn.project_cleanup( 92 dry_run=True, 93 wait_timeout=120, 94 status_queue=status_queue) 95 96 objects = [] 97 while not status_queue.empty(): 98 objects.append(status_queue.get()) 99 100 net_names = list(obj.name for obj in objects) 101 self.assertIn(self.network_name, net_names) 102 103 # Ensure network still exists 104 net = self.conn.network.get_network(self.net.id) 105 self.assertEqual(net.name, self.net.name) 106 107 # Last round - do a real cleanup 108 self.conn.project_cleanup( 109 dry_run=False, 110 wait_timeout=600, 111 status_queue=status_queue) 112 113 objects = [] 114 while not status_queue.empty(): 115 objects.append(status_queue.get()) 116 117 nets = self.conn.network.networks() 118 net_names = list(obj.name for obj in nets) 119 # Since we might not have enough privs to drop all nets - ensure 120 # we do not have our known one 121 self.assertNotIn(self.network_name, net_names) 122