1# This file is part of cloud-init. See LICENSE file for license information. 2 3from cloudinit.helpers import Paths 4from cloudinit.sources import DataSourceIBMCloud as ibm 5from cloudinit.tests import helpers as test_helpers 6from cloudinit import util 7 8import base64 9import copy 10import json 11from textwrap import dedent 12 13mock = test_helpers.mock 14 15D_PATH = "cloudinit.sources.DataSourceIBMCloud." 16 17 18@mock.patch(D_PATH + "_is_xen", return_value=True) 19@mock.patch(D_PATH + "_is_ibm_provisioning") 20@mock.patch(D_PATH + "util.blkid") 21class TestGetIBMPlatform(test_helpers.CiTestCase): 22 """Test the get_ibm_platform helper.""" 23 24 blkid_base = { 25 "/dev/xvda1": { 26 "DEVNAME": "/dev/xvda1", "LABEL": "cloudimg-bootfs", 27 "TYPE": "ext3"}, 28 "/dev/xvda2": { 29 "DEVNAME": "/dev/xvda2", "LABEL": "cloudimg-rootfs", 30 "TYPE": "ext4"}, 31 } 32 33 blkid_metadata_disk = { 34 "/dev/xvdh1": { 35 "DEVNAME": "/dev/xvdh1", "LABEL": "METADATA", "TYPE": "vfat", 36 "SEC_TYPE": "msdos", "UUID": "681B-8C5D", 37 "PARTUUID": "3d631e09-01"}, 38 } 39 40 blkid_oscode_disk = { 41 "/dev/xvdh": { 42 "DEVNAME": "/dev/xvdh", "LABEL": "config-2", "TYPE": "vfat", 43 "SEC_TYPE": "msdos", "UUID": ibm.IBM_CONFIG_UUID} 44 } 45 46 def setUp(self): 47 self.blkid_metadata = copy.deepcopy(self.blkid_base) 48 self.blkid_metadata.update(copy.deepcopy(self.blkid_metadata_disk)) 49 50 self.blkid_oscode = copy.deepcopy(self.blkid_base) 51 self.blkid_oscode.update(copy.deepcopy(self.blkid_oscode_disk)) 52 53 def test_id_template_live_metadata(self, m_blkid, m_is_prov, _m_xen): 54 """identify TEMPLATE_LIVE_METADATA.""" 55 m_blkid.return_value = self.blkid_metadata 56 m_is_prov.return_value = False 57 self.assertEqual( 58 (ibm.Platforms.TEMPLATE_LIVE_METADATA, "/dev/xvdh1"), 59 ibm.get_ibm_platform()) 60 61 def test_id_template_prov_metadata(self, m_blkid, m_is_prov, _m_xen): 62 """identify TEMPLATE_PROVISIONING_METADATA.""" 63 m_blkid.return_value = self.blkid_metadata 64 m_is_prov.return_value = True 65 self.assertEqual( 66 (ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh1"), 67 ibm.get_ibm_platform()) 68 69 def test_id_template_prov_nodata(self, m_blkid, m_is_prov, _m_xen): 70 """identify TEMPLATE_PROVISIONING_NODATA.""" 71 m_blkid.return_value = self.blkid_base 72 m_is_prov.return_value = True 73 self.assertEqual( 74 (ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None), 75 ibm.get_ibm_platform()) 76 77 def test_id_os_code(self, m_blkid, m_is_prov, _m_xen): 78 """Identify OS_CODE.""" 79 m_blkid.return_value = self.blkid_oscode 80 m_is_prov.return_value = False 81 self.assertEqual((ibm.Platforms.OS_CODE, "/dev/xvdh"), 82 ibm.get_ibm_platform()) 83 84 def test_id_os_code_must_match_uuid(self, m_blkid, m_is_prov, _m_xen): 85 """Test against false positive on openstack with non-ibm UUID.""" 86 blkid = self.blkid_oscode 87 blkid["/dev/xvdh"]["UUID"] = "9999-9999" 88 m_blkid.return_value = blkid 89 m_is_prov.return_value = False 90 self.assertEqual((None, None), ibm.get_ibm_platform()) 91 92 93@mock.patch(D_PATH + "_read_system_uuid", return_value=None) 94@mock.patch(D_PATH + "get_ibm_platform") 95class TestReadMD(test_helpers.CiTestCase): 96 """Test the read_datasource helper.""" 97 98 template_md = { 99 "files": [], 100 "network_config": {"content_path": "/content/interfaces"}, 101 "hostname": "ci-fond-ram", 102 "name": "ci-fond-ram", 103 "domain": "testing.ci.cloud-init.org", 104 "meta": {"dsmode": "net"}, 105 "uuid": "8e636730-9f5d-c4a5-327c-d7123c46e82f", 106 "public_keys": {"1091307": "ssh-rsa AAAAB3NzaC1...Hw== ci-pubkey"}, 107 } 108 109 oscode_md = { 110 "hostname": "ci-grand-gannet.testing.ci.cloud-init.org", 111 "name": "ci-grand-gannet", 112 "uuid": "2f266908-8e6c-4818-9b5c-42e9cc66a785", 113 "random_seed": "bm90LXJhbmRvbQo=", 114 "crypt_key": "ssh-rsa AAAAB3NzaC1yc2..n6z/", 115 "configuration_token": "eyJhbGciOi..M3ZA", 116 "public_keys": {"1091307": "ssh-rsa AAAAB3N..Hw== ci-pubkey"}, 117 } 118 119 content_interfaces = dedent("""\ 120 auto lo 121 iface lo inet loopback 122 123 auto eth0 124 allow-hotplug eth0 125 iface eth0 inet static 126 address 10.82.43.5 127 netmask 255.255.255.192 128 """) 129 130 userdata = b"#!/bin/sh\necho hi mom\n" 131 # meta.js file gets json encoded userdata as a list. 132 meta_js = '["#!/bin/sh\necho hi mom\n"]' 133 vendor_data = { 134 "cloud-init": "#!/bin/bash\necho 'root:$6$5ab01p1m1' | chpasswd -e"} 135 136 network_data = { 137 "links": [ 138 {"id": "interface_29402281", "name": "eth0", "mtu": None, 139 "type": "phy", "ethernet_mac_address": "06:00:f1:bd:da:25"}, 140 {"id": "interface_29402279", "name": "eth1", "mtu": None, 141 "type": "phy", "ethernet_mac_address": "06:98:5e:d0:7f:86"} 142 ], 143 "networks": [ 144 {"id": "network_109887563", "link": "interface_29402281", 145 "type": "ipv4", "ip_address": "10.82.43.2", 146 "netmask": "255.255.255.192", 147 "routes": [ 148 {"network": "10.0.0.0", "netmask": "255.0.0.0", 149 "gateway": "10.82.43.1"}, 150 {"network": "161.26.0.0", "netmask": "255.255.0.0", 151 "gateway": "10.82.43.1"}]}, 152 {"id": "network_109887551", "link": "interface_29402279", 153 "type": "ipv4", "ip_address": "108.168.194.252", 154 "netmask": "255.255.255.248", 155 "routes": [ 156 {"network": "0.0.0.0", "netmask": "0.0.0.0", 157 "gateway": "108.168.194.249"}]} 158 ], 159 "services": [ 160 {"type": "dns", "address": "10.0.80.11"}, 161 {"type": "dns", "address": "10.0.80.12"} 162 ], 163 } 164 165 sysuuid = '7f79ebf5-d791-43c3-a723-854e8389d59f' 166 167 def _get_expected_metadata(self, os_md): 168 """return expected 'metadata' for data loaded from meta_data.json.""" 169 os_md = copy.deepcopy(os_md) 170 renames = ( 171 ('hostname', 'local-hostname'), 172 ('uuid', 'instance-id'), 173 ('public_keys', 'public-keys')) 174 ret = {} 175 for osname, mdname in renames: 176 if osname in os_md: 177 ret[mdname] = os_md[osname] 178 if 'random_seed' in os_md: 179 ret['random_seed'] = base64.b64decode(os_md['random_seed']) 180 181 return ret 182 183 def test_provisioning_md(self, m_platform, m_sysuuid): 184 """Provisioning env with a metadata disk should return None.""" 185 m_platform.return_value = ( 186 ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh") 187 self.assertIsNone(ibm.read_md()) 188 189 def test_provisioning_no_metadata(self, m_platform, m_sysuuid): 190 """Provisioning env with no metadata disk should return None.""" 191 m_platform.return_value = ( 192 ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None) 193 self.assertIsNone(ibm.read_md()) 194 195 def test_provisioning_not_ibm(self, m_platform, m_sysuuid): 196 """Provisioning env but not identified as IBM should return None.""" 197 m_platform.return_value = (None, None) 198 self.assertIsNone(ibm.read_md()) 199 200 def test_template_live(self, m_platform, m_sysuuid): 201 """Template live environment should be identified.""" 202 tmpdir = self.tmp_dir() 203 m_platform.return_value = ( 204 ibm.Platforms.TEMPLATE_LIVE_METADATA, tmpdir) 205 m_sysuuid.return_value = self.sysuuid 206 207 test_helpers.populate_dir(tmpdir, { 208 'openstack/latest/meta_data.json': json.dumps(self.template_md), 209 'openstack/latest/user_data': self.userdata, 210 'openstack/content/interfaces': self.content_interfaces, 211 'meta.js': self.meta_js}) 212 213 ret = ibm.read_md() 214 self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA, 215 ret['platform']) 216 self.assertEqual(tmpdir, ret['source']) 217 self.assertEqual(self.userdata, ret['userdata']) 218 self.assertEqual(self._get_expected_metadata(self.template_md), 219 ret['metadata']) 220 self.assertEqual(self.sysuuid, ret['system-uuid']) 221 222 def test_os_code_live(self, m_platform, m_sysuuid): 223 """Verify an os_code metadata path.""" 224 tmpdir = self.tmp_dir() 225 m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir) 226 netdata = json.dumps(self.network_data) 227 test_helpers.populate_dir(tmpdir, { 228 'openstack/latest/meta_data.json': json.dumps(self.oscode_md), 229 'openstack/latest/user_data': self.userdata, 230 'openstack/latest/vendor_data.json': json.dumps(self.vendor_data), 231 'openstack/latest/network_data.json': netdata, 232 }) 233 234 ret = ibm.read_md() 235 self.assertEqual(ibm.Platforms.OS_CODE, ret['platform']) 236 self.assertEqual(tmpdir, ret['source']) 237 self.assertEqual(self.userdata, ret['userdata']) 238 self.assertEqual(self._get_expected_metadata(self.oscode_md), 239 ret['metadata']) 240 241 def test_os_code_live_no_userdata(self, m_platform, m_sysuuid): 242 """Verify os_code without user-data.""" 243 tmpdir = self.tmp_dir() 244 m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir) 245 test_helpers.populate_dir(tmpdir, { 246 'openstack/latest/meta_data.json': json.dumps(self.oscode_md), 247 'openstack/latest/vendor_data.json': json.dumps(self.vendor_data), 248 }) 249 250 ret = ibm.read_md() 251 self.assertEqual(ibm.Platforms.OS_CODE, ret['platform']) 252 self.assertEqual(tmpdir, ret['source']) 253 self.assertIsNone(ret['userdata']) 254 self.assertEqual(self._get_expected_metadata(self.oscode_md), 255 ret['metadata']) 256 257 258class TestIsIBMProvisioning(test_helpers.FilesystemMockingTestCase): 259 """Test the _is_ibm_provisioning method.""" 260 inst_log = "/root/swinstall.log" 261 prov_cfg = "/root/provisioningConfiguration.cfg" 262 boot_ref = "/proc/1/environ" 263 with_logs = True 264 265 def _call_with_root(self, rootd): 266 self.reRoot(rootd) 267 return ibm._is_ibm_provisioning() 268 269 def test_no_config(self): 270 """No provisioning config means not provisioning.""" 271 self.assertFalse(self._call_with_root(self.tmp_dir())) 272 273 def test_config_only(self): 274 """A provisioning config without a log means provisioning.""" 275 rootd = self.tmp_dir() 276 test_helpers.populate_dir(rootd, {self.prov_cfg: "key=value"}) 277 self.assertTrue(self._call_with_root(rootd)) 278 279 def test_config_with_old_log(self): 280 """A config with a log from previous boot is not provisioning.""" 281 rootd = self.tmp_dir() 282 data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10), 283 self.inst_log: ("log data\n", -30), 284 self.boot_ref: ("PWD=/", 0)} 285 test_helpers.populate_dir_with_ts(rootd, data) 286 self.assertFalse(self._call_with_root(rootd=rootd)) 287 self.assertIn("from previous boot", self.logs.getvalue()) 288 289 def test_config_with_new_log(self): 290 """A config with a log from this boot is provisioning.""" 291 rootd = self.tmp_dir() 292 data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10), 293 self.inst_log: ("log data\n", 30), 294 self.boot_ref: ("PWD=/", 0)} 295 test_helpers.populate_dir_with_ts(rootd, data) 296 self.assertTrue(self._call_with_root(rootd=rootd)) 297 self.assertIn("from current boot", self.logs.getvalue()) 298 299 def test_config_and_log_no_reference(self): 300 """If the config and log existed, but no reference, assume not.""" 301 rootd = self.tmp_dir() 302 test_helpers.populate_dir( 303 rootd, {self.prov_cfg: "key=value", self.inst_log: "log data\n"}) 304 self.assertFalse(self._call_with_root(rootd=rootd)) 305 self.assertIn("no reference file", self.logs.getvalue()) 306 307 308class TestDataSourceIBMCloud(test_helpers.CiTestCase): 309 310 def setUp(self): 311 super(TestDataSourceIBMCloud, self).setUp() 312 self.tmp = self.tmp_dir() 313 self.cloud_dir = self.tmp_path('cloud', dir=self.tmp) 314 util.ensure_dir(self.cloud_dir) 315 paths = Paths({'run_dir': self.tmp, 'cloud_dir': self.cloud_dir}) 316 self.ds = ibm.DataSourceIBMCloud( 317 sys_cfg={}, distro=None, paths=paths) 318 319 def test_get_data_false(self): 320 """When read_md returns None, get_data returns False.""" 321 with mock.patch(D_PATH + 'read_md', return_value=None): 322 self.assertFalse(self.ds.get_data()) 323 324 def test_get_data_processes_read_md(self): 325 """get_data processes and caches content returned by read_md.""" 326 md = { 327 'metadata': {}, 'networkdata': 'net', 'platform': 'plat', 328 'source': 'src', 'system-uuid': 'uuid', 'userdata': 'ud', 329 'vendordata': 'vd'} 330 with mock.patch(D_PATH + 'read_md', return_value=md): 331 self.assertTrue(self.ds.get_data()) 332 self.assertEqual('src', self.ds.source) 333 self.assertEqual('plat', self.ds.platform) 334 self.assertEqual({}, self.ds.metadata) 335 self.assertEqual('ud', self.ds.userdata_raw) 336 self.assertEqual('net', self.ds.network_json) 337 self.assertEqual('vd', self.ds.vendordata_pure) 338 self.assertEqual('uuid', self.ds.system_uuid) 339 self.assertEqual('ibmcloud', self.ds.cloud_name) 340 self.assertEqual('ibmcloud', self.ds.platform_type) 341 self.assertEqual('plat (src)', self.ds.subplatform) 342 343# vi: ts=4 expandtab 344