1# Copyright (C) 2015 Canonical Ltd. 2# Copyright (C) 2016 VMware INC. 3# 4# Author: Sankar Tanguturi <stanguturi@vmware.com> 5# Pengpeng Sun <pengpengs@vmware.com> 6# 7# This file is part of cloud-init. See LICENSE file for license information. 8 9import logging 10import os 11import sys 12import tempfile 13import textwrap 14 15from cloudinit.sources.DataSourceOVF import get_network_config_from_conf 16from cloudinit.sources.DataSourceOVF import read_vmware_imc 17from cloudinit.sources.helpers.vmware.imc.boot_proto import BootProtoEnum 18from cloudinit.sources.helpers.vmware.imc.config import Config 19from cloudinit.sources.helpers.vmware.imc.config_file import ConfigFile 20from cloudinit.sources.helpers.vmware.imc.config_nic import gen_subnet 21from cloudinit.sources.helpers.vmware.imc.config_nic import NicConfigurator 22from cloudinit.tests.helpers import CiTestCase 23 24logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) 25logger = logging.getLogger(__name__) 26 27 28class TestVmwareConfigFile(CiTestCase): 29 30 def test_utility_methods(self): 31 """Tests basic utility methods of ConfigFile class""" 32 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 33 34 cf.clear() 35 36 self.assertEqual(0, len(cf), "clear size") 37 38 cf._insertKey(" PASSWORD|-PASS ", " foo ") 39 cf._insertKey("BAR", " ") 40 41 self.assertEqual(2, len(cf), "insert size") 42 self.assertEqual('foo', cf["PASSWORD|-PASS"], "password") 43 self.assertTrue("PASSWORD|-PASS" in cf, "hasPassword") 44 self.assertFalse(cf.should_keep_current_value("PASSWORD|-PASS"), 45 "keepPassword") 46 self.assertFalse(cf.should_remove_current_value("PASSWORD|-PASS"), 47 "removePassword") 48 self.assertFalse("FOO" in cf, "hasFoo") 49 self.assertTrue(cf.should_keep_current_value("FOO"), "keepFoo") 50 self.assertFalse(cf.should_remove_current_value("FOO"), "removeFoo") 51 self.assertTrue("BAR" in cf, "hasBar") 52 self.assertFalse(cf.should_keep_current_value("BAR"), "keepBar") 53 self.assertTrue(cf.should_remove_current_value("BAR"), "removeBar") 54 55 def test_datasource_instance_id(self): 56 """Tests instance id for the DatasourceOVF""" 57 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 58 59 instance_id_prefix = 'iid-vmware-' 60 61 conf = Config(cf) 62 63 (md1, _, _) = read_vmware_imc(conf) 64 self.assertIn(instance_id_prefix, md1["instance-id"]) 65 self.assertEqual(md1["instance-id"], 'iid-vmware-imc') 66 67 (md2, _, _) = read_vmware_imc(conf) 68 self.assertIn(instance_id_prefix, md2["instance-id"]) 69 self.assertEqual(md2["instance-id"], 'iid-vmware-imc') 70 71 self.assertEqual(md2["instance-id"], md1["instance-id"]) 72 73 def test_configfile_static_2nics(self): 74 """Tests Config class for a configuration with two static NICs.""" 75 cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") 76 77 conf = Config(cf) 78 79 self.assertEqual('myhost1', conf.host_name, "hostName") 80 self.assertEqual('Africa/Abidjan', conf.timezone, "tz") 81 self.assertTrue(conf.utc, "utc") 82 83 self.assertEqual(['10.20.145.1', '10.20.145.2'], 84 conf.name_servers, 85 "dns") 86 self.assertEqual(['eng.vmware.com', 'proxy.vmware.com'], 87 conf.dns_suffixes, 88 "suffixes") 89 90 nics = conf.nics 91 ipv40 = nics[0].staticIpv4 92 93 self.assertEqual(2, len(nics), "nics") 94 self.assertEqual('NIC1', nics[0].name, "nic0") 95 self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0") 96 self.assertEqual(BootProtoEnum.STATIC, nics[0].bootProto, "bootproto0") 97 self.assertEqual('10.20.87.154', ipv40[0].ip, "ipv4Addr0") 98 self.assertEqual('255.255.252.0', ipv40[0].netmask, "ipv4Mask0") 99 self.assertEqual(2, len(ipv40[0].gateways), "ipv4Gw0") 100 self.assertEqual('10.20.87.253', ipv40[0].gateways[0], "ipv4Gw0_0") 101 self.assertEqual('10.20.87.105', ipv40[0].gateways[1], "ipv4Gw0_1") 102 103 self.assertEqual(1, len(nics[0].staticIpv6), "ipv6Cnt0") 104 self.assertEqual('fc00:10:20:87::154', 105 nics[0].staticIpv6[0].ip, 106 "ipv6Addr0") 107 108 self.assertEqual('NIC2', nics[1].name, "nic1") 109 self.assertTrue(not nics[1].staticIpv6, "ipv61 dhcp") 110 111 def test_config_file_dhcp_2nics(self): 112 """Tests Config class for a configuration with two DHCP NICs.""" 113 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 114 115 conf = Config(cf) 116 nics = conf.nics 117 self.assertEqual(2, len(nics), "nics") 118 self.assertEqual('NIC1', nics[0].name, "nic0") 119 self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0") 120 self.assertEqual(BootProtoEnum.DHCP, nics[0].bootProto, "bootproto0") 121 122 def test_config_password(self): 123 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 124 125 cf._insertKey("PASSWORD|-PASS", "test-password") 126 cf._insertKey("PASSWORD|RESET", "no") 127 128 conf = Config(cf) 129 self.assertEqual('test-password', conf.admin_password, "password") 130 self.assertFalse(conf.reset_password, "do not reset password") 131 132 def test_config_reset_passwd(self): 133 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 134 135 cf._insertKey("PASSWORD|-PASS", "test-password") 136 cf._insertKey("PASSWORD|RESET", "random") 137 138 conf = Config(cf) 139 with self.assertRaises(ValueError): 140 pw = conf.reset_password 141 self.assertIsNone(pw) 142 143 cf.clear() 144 cf._insertKey("PASSWORD|RESET", "yes") 145 self.assertEqual(1, len(cf), "insert size") 146 147 conf = Config(cf) 148 self.assertTrue(conf.reset_password, "reset password") 149 150 def test_get_config_nameservers(self): 151 """Tests DNS and nameserver settings in a configuration.""" 152 cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") 153 154 config = Config(cf) 155 156 network_config = get_network_config_from_conf(config, False) 157 158 self.assertEqual(1, network_config.get('version')) 159 160 config_types = network_config.get('config') 161 name_servers = None 162 dns_suffixes = None 163 164 for type in config_types: 165 if type.get('type') == 'nameserver': 166 name_servers = type.get('address') 167 dns_suffixes = type.get('search') 168 break 169 170 self.assertEqual(['10.20.145.1', '10.20.145.2'], 171 name_servers, 172 "dns") 173 self.assertEqual(['eng.vmware.com', 'proxy.vmware.com'], 174 dns_suffixes, 175 "suffixes") 176 177 def test_gen_subnet(self): 178 """Tests if gen_subnet properly calculates network subnet from 179 IPv4 address and netmask""" 180 ip_subnet_list = [['10.20.87.253', '255.255.252.0', '10.20.84.0'], 181 ['10.20.92.105', '255.255.252.0', '10.20.92.0'], 182 ['192.168.0.10', '255.255.0.0', '192.168.0.0']] 183 for entry in ip_subnet_list: 184 self.assertEqual(entry[2], gen_subnet(entry[0], entry[1]), 185 "Subnet for a specified ip and netmask") 186 187 def test_get_config_dns_suffixes(self): 188 """Tests if get_network_config_from_conf properly 189 generates nameservers and dns settings from a 190 specified configuration""" 191 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 192 193 config = Config(cf) 194 195 network_config = get_network_config_from_conf(config, False) 196 197 self.assertEqual(1, network_config.get('version')) 198 199 config_types = network_config.get('config') 200 name_servers = None 201 dns_suffixes = None 202 203 for type in config_types: 204 if type.get('type') == 'nameserver': 205 name_servers = type.get('address') 206 dns_suffixes = type.get('search') 207 break 208 209 self.assertEqual([], 210 name_servers, 211 "dns") 212 self.assertEqual(['eng.vmware.com'], 213 dns_suffixes, 214 "suffixes") 215 216 def test_get_nics_list_dhcp(self): 217 """Tests if NicConfigurator properly calculates network subnets 218 for a configuration with a list of DHCP NICs""" 219 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 220 221 config = Config(cf) 222 223 nicConfigurator = NicConfigurator(config.nics, False) 224 nics_cfg_list = nicConfigurator.generate() 225 226 self.assertEqual(2, len(nics_cfg_list), "number of config elements") 227 228 nic1 = {'name': 'NIC1'} 229 nic2 = {'name': 'NIC2'} 230 for cfg in nics_cfg_list: 231 if cfg.get('name') == nic1.get('name'): 232 nic1.update(cfg) 233 elif cfg.get('name') == nic2.get('name'): 234 nic2.update(cfg) 235 236 self.assertEqual('physical', nic1.get('type'), 'type of NIC1') 237 self.assertEqual('NIC1', nic1.get('name'), 'name of NIC1') 238 self.assertEqual('00:50:56:a6:8c:08', nic1.get('mac_address'), 239 'mac address of NIC1') 240 subnets = nic1.get('subnets') 241 self.assertEqual(1, len(subnets), 'number of subnets for NIC1') 242 subnet = subnets[0] 243 self.assertEqual('dhcp', subnet.get('type'), 'DHCP type for NIC1') 244 self.assertEqual('auto', subnet.get('control'), 'NIC1 Control type') 245 246 self.assertEqual('physical', nic2.get('type'), 'type of NIC2') 247 self.assertEqual('NIC2', nic2.get('name'), 'name of NIC2') 248 self.assertEqual('00:50:56:a6:5a:de', nic2.get('mac_address'), 249 'mac address of NIC2') 250 subnets = nic2.get('subnets') 251 self.assertEqual(1, len(subnets), 'number of subnets for NIC2') 252 subnet = subnets[0] 253 self.assertEqual('dhcp', subnet.get('type'), 'DHCP type for NIC2') 254 self.assertEqual('auto', subnet.get('control'), 'NIC2 Control type') 255 256 def test_get_nics_list_static(self): 257 """Tests if NicConfigurator properly calculates network subnets 258 for a configuration with 2 static NICs""" 259 cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg") 260 261 config = Config(cf) 262 263 nicConfigurator = NicConfigurator(config.nics, False) 264 nics_cfg_list = nicConfigurator.generate() 265 266 self.assertEqual(2, len(nics_cfg_list), "number of elements") 267 268 nic1 = {'name': 'NIC1'} 269 nic2 = {'name': 'NIC2'} 270 route_list = [] 271 for cfg in nics_cfg_list: 272 cfg_type = cfg.get('type') 273 if cfg_type == 'physical': 274 if cfg.get('name') == nic1.get('name'): 275 nic1.update(cfg) 276 elif cfg.get('name') == nic2.get('name'): 277 nic2.update(cfg) 278 279 self.assertEqual('physical', nic1.get('type'), 'type of NIC1') 280 self.assertEqual('NIC1', nic1.get('name'), 'name of NIC1') 281 self.assertEqual('00:50:56:a6:8c:08', nic1.get('mac_address'), 282 'mac address of NIC1') 283 284 subnets = nic1.get('subnets') 285 self.assertEqual(2, len(subnets), 'Number of subnets') 286 287 static_subnet = [] 288 static6_subnet = [] 289 290 for subnet in subnets: 291 subnet_type = subnet.get('type') 292 if subnet_type == 'static': 293 static_subnet.append(subnet) 294 elif subnet_type == 'static6': 295 static6_subnet.append(subnet) 296 else: 297 self.assertEqual(True, False, 'Unknown type') 298 if 'route' in subnet: 299 for route in subnet.get('routes'): 300 route_list.append(route) 301 302 self.assertEqual(1, len(static_subnet), 'Number of static subnet') 303 self.assertEqual(1, len(static6_subnet), 'Number of static6 subnet') 304 305 subnet = static_subnet[0] 306 self.assertEqual('10.20.87.154', subnet.get('address'), 307 'IPv4 address of static subnet') 308 self.assertEqual('255.255.252.0', subnet.get('netmask'), 309 'NetMask of static subnet') 310 self.assertEqual('auto', subnet.get('control'), 311 'control for static subnet') 312 313 subnet = static6_subnet[0] 314 self.assertEqual('fc00:10:20:87::154', subnet.get('address'), 315 'IPv6 address of static subnet') 316 self.assertEqual('64', subnet.get('netmask'), 317 'NetMask of static6 subnet') 318 319 route_set = set(['10.20.87.253', '10.20.87.105', '192.168.0.10']) 320 for route in route_list: 321 self.assertEqual(10000, route.get('metric'), 'metric of route') 322 gateway = route.get('gateway') 323 if gateway in route_set: 324 route_set.discard(gateway) 325 else: 326 self.assertEqual(True, False, 'invalid gateway %s' % (gateway)) 327 328 self.assertEqual('physical', nic2.get('type'), 'type of NIC2') 329 self.assertEqual('NIC2', nic2.get('name'), 'name of NIC2') 330 self.assertEqual('00:50:56:a6:ef:7d', nic2.get('mac_address'), 331 'mac address of NIC2') 332 333 subnets = nic2.get('subnets') 334 self.assertEqual(1, len(subnets), 'Number of subnets for NIC2') 335 336 subnet = subnets[0] 337 self.assertEqual('static', subnet.get('type'), 'Subnet type') 338 self.assertEqual('192.168.6.102', subnet.get('address'), 339 'Subnet address') 340 self.assertEqual('255.255.0.0', subnet.get('netmask'), 341 'Subnet netmask') 342 343 def test_custom_script(self): 344 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 345 conf = Config(cf) 346 self.assertIsNone(conf.custom_script_name) 347 cf._insertKey("CUSTOM-SCRIPT|SCRIPT-NAME", "test-script") 348 conf = Config(cf) 349 self.assertEqual("test-script", conf.custom_script_name) 350 351 def test_post_gc_status(self): 352 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 353 conf = Config(cf) 354 self.assertFalse(conf.post_gc_status) 355 cf._insertKey("MISC|POST-GC-STATUS", "YES") 356 conf = Config(cf) 357 self.assertTrue(conf.post_gc_status) 358 359 def test_no_default_run_post_script(self): 360 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 361 conf = Config(cf) 362 self.assertFalse(conf.default_run_post_script) 363 cf._insertKey("MISC|DEFAULT-RUN-POST-CUST-SCRIPT", "NO") 364 conf = Config(cf) 365 self.assertFalse(conf.default_run_post_script) 366 367 def test_yes_default_run_post_script(self): 368 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 369 cf._insertKey("MISC|DEFAULT-RUN-POST-CUST-SCRIPT", "yes") 370 conf = Config(cf) 371 self.assertTrue(conf.default_run_post_script) 372 373 374class TestVmwareNetConfig(CiTestCase): 375 """Test conversion of vmware config to cloud-init config.""" 376 377 maxDiff = None 378 379 def _get_NicConfigurator(self, text): 380 fp = None 381 try: 382 with tempfile.NamedTemporaryFile(mode="w", dir=self.tmp_dir(), 383 delete=False) as fp: 384 fp.write(text) 385 fp.close() 386 cfg = Config(ConfigFile(fp.name)) 387 return NicConfigurator(cfg.nics, use_system_devices=False) 388 finally: 389 if fp: 390 os.unlink(fp.name) 391 392 def test_non_primary_nic_without_gateway(self): 393 """A non primary nic set is not required to have a gateway.""" 394 config = textwrap.dedent("""\ 395 [NETWORK] 396 NETWORKING = yes 397 BOOTPROTO = dhcp 398 HOSTNAME = myhost1 399 DOMAINNAME = eng.vmware.com 400 401 [NIC-CONFIG] 402 NICS = NIC1 403 404 [NIC1] 405 MACADDR = 00:50:56:a6:8c:08 406 ONBOOT = yes 407 IPv4_MODE = BACKWARDS_COMPATIBLE 408 BOOTPROTO = static 409 IPADDR = 10.20.87.154 410 NETMASK = 255.255.252.0 411 """) 412 nc = self._get_NicConfigurator(config) 413 self.assertEqual( 414 [{'type': 'physical', 'name': 'NIC1', 415 'mac_address': '00:50:56:a6:8c:08', 416 'subnets': [ 417 {'control': 'auto', 'type': 'static', 418 'address': '10.20.87.154', 'netmask': '255.255.252.0'}]}], 419 nc.generate()) 420 421 def test_non_primary_nic_with_gateway(self): 422 """A non primary nic set can have a gateway.""" 423 config = textwrap.dedent("""\ 424 [NETWORK] 425 NETWORKING = yes 426 BOOTPROTO = dhcp 427 HOSTNAME = myhost1 428 DOMAINNAME = eng.vmware.com 429 430 [NIC-CONFIG] 431 NICS = NIC1 432 433 [NIC1] 434 MACADDR = 00:50:56:a6:8c:08 435 ONBOOT = yes 436 IPv4_MODE = BACKWARDS_COMPATIBLE 437 BOOTPROTO = static 438 IPADDR = 10.20.87.154 439 NETMASK = 255.255.252.0 440 GATEWAY = 10.20.87.253 441 """) 442 nc = self._get_NicConfigurator(config) 443 self.assertEqual( 444 [{'type': 'physical', 'name': 'NIC1', 445 'mac_address': '00:50:56:a6:8c:08', 446 'subnets': [ 447 {'control': 'auto', 'type': 'static', 448 'address': '10.20.87.154', 'netmask': '255.255.252.0', 449 'routes': 450 [{'type': 'route', 'destination': '10.20.84.0/22', 451 'gateway': '10.20.87.253', 'metric': 10000}]}]}], 452 nc.generate()) 453 454 def test_cust_non_primary_nic_with_gateway_(self): 455 """A customer non primary nic set can have a gateway.""" 456 config = textwrap.dedent("""\ 457 [NETWORK] 458 NETWORKING = yes 459 BOOTPROTO = dhcp 460 HOSTNAME = static-debug-vm 461 DOMAINNAME = cluster.local 462 463 [NIC-CONFIG] 464 NICS = NIC1 465 466 [NIC1] 467 MACADDR = 00:50:56:ac:d1:8a 468 ONBOOT = yes 469 IPv4_MODE = BACKWARDS_COMPATIBLE 470 BOOTPROTO = static 471 IPADDR = 100.115.223.75 472 NETMASK = 255.255.255.0 473 GATEWAY = 100.115.223.254 474 475 476 [DNS] 477 DNSFROMDHCP=no 478 479 NAMESERVER|1 = 8.8.8.8 480 481 [DATETIME] 482 UTC = yes 483 """) 484 nc = self._get_NicConfigurator(config) 485 self.assertEqual( 486 [{'type': 'physical', 'name': 'NIC1', 487 'mac_address': '00:50:56:ac:d1:8a', 488 'subnets': [ 489 {'control': 'auto', 'type': 'static', 490 'address': '100.115.223.75', 'netmask': '255.255.255.0', 491 'routes': 492 [{'type': 'route', 'destination': '100.115.223.0/24', 493 'gateway': '100.115.223.254', 'metric': 10000}]}]}], 494 nc.generate()) 495 496 def test_a_primary_nic_with_gateway(self): 497 """A primary nic set can have a gateway.""" 498 config = textwrap.dedent("""\ 499 [NETWORK] 500 NETWORKING = yes 501 BOOTPROTO = dhcp 502 HOSTNAME = myhost1 503 DOMAINNAME = eng.vmware.com 504 505 [NIC-CONFIG] 506 NICS = NIC1 507 508 [NIC1] 509 MACADDR = 00:50:56:a6:8c:08 510 ONBOOT = yes 511 IPv4_MODE = BACKWARDS_COMPATIBLE 512 BOOTPROTO = static 513 IPADDR = 10.20.87.154 514 NETMASK = 255.255.252.0 515 PRIMARY = true 516 GATEWAY = 10.20.87.253 517 """) 518 nc = self._get_NicConfigurator(config) 519 self.assertEqual( 520 [{'type': 'physical', 'name': 'NIC1', 521 'mac_address': '00:50:56:a6:8c:08', 522 'subnets': [ 523 {'control': 'auto', 'type': 'static', 524 'address': '10.20.87.154', 'netmask': '255.255.252.0', 525 'gateway': '10.20.87.253'}]}], 526 nc.generate()) 527 528 def test_meta_data(self): 529 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 530 conf = Config(cf) 531 self.assertIsNone(conf.meta_data_name) 532 cf._insertKey("CLOUDINIT|METADATA", "test-metadata") 533 conf = Config(cf) 534 self.assertEqual("test-metadata", conf.meta_data_name) 535 536 def test_user_data(self): 537 cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg") 538 conf = Config(cf) 539 self.assertIsNone(conf.user_data_name) 540 cf._insertKey("CLOUDINIT|USERDATA", "test-userdata") 541 conf = Config(cf) 542 self.assertEqual("test-userdata", conf.user_data_name) 543 544 545# vi: ts=4 expandtab 546