1# Copyright (C) 2015 Canonical Ltd.
2# Copyright (C) 2016 VMware INC.
3#
4# Author: Sankar Tanguturi <stanguturi@vmware.com>
5#         Pengpeng Sun <pengpengs@vmware.com>
6#
7# This file is part of cloud-init. See LICENSE file for license information.
8
9import logging
10import os
11import sys
12import tempfile
13import textwrap
14
15from cloudinit.sources.DataSourceOVF import get_network_config_from_conf
16from cloudinit.sources.DataSourceOVF import read_vmware_imc
17from cloudinit.sources.helpers.vmware.imc.boot_proto import BootProtoEnum
18from cloudinit.sources.helpers.vmware.imc.config import Config
19from cloudinit.sources.helpers.vmware.imc.config_file import ConfigFile
20from cloudinit.sources.helpers.vmware.imc.config_nic import gen_subnet
21from cloudinit.sources.helpers.vmware.imc.config_nic import NicConfigurator
22from cloudinit.tests.helpers import CiTestCase
23
24logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
25logger = logging.getLogger(__name__)
26
27
28class TestVmwareConfigFile(CiTestCase):
29
30    def test_utility_methods(self):
31        """Tests basic utility methods of ConfigFile class"""
32        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
33
34        cf.clear()
35
36        self.assertEqual(0, len(cf), "clear size")
37
38        cf._insertKey("  PASSWORD|-PASS ", "  foo  ")
39        cf._insertKey("BAR", "   ")
40
41        self.assertEqual(2, len(cf), "insert size")
42        self.assertEqual('foo', cf["PASSWORD|-PASS"], "password")
43        self.assertTrue("PASSWORD|-PASS" in cf, "hasPassword")
44        self.assertFalse(cf.should_keep_current_value("PASSWORD|-PASS"),
45                         "keepPassword")
46        self.assertFalse(cf.should_remove_current_value("PASSWORD|-PASS"),
47                         "removePassword")
48        self.assertFalse("FOO" in cf, "hasFoo")
49        self.assertTrue(cf.should_keep_current_value("FOO"), "keepFoo")
50        self.assertFalse(cf.should_remove_current_value("FOO"), "removeFoo")
51        self.assertTrue("BAR" in cf, "hasBar")
52        self.assertFalse(cf.should_keep_current_value("BAR"), "keepBar")
53        self.assertTrue(cf.should_remove_current_value("BAR"), "removeBar")
54
55    def test_datasource_instance_id(self):
56        """Tests instance id for the DatasourceOVF"""
57        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
58
59        instance_id_prefix = 'iid-vmware-'
60
61        conf = Config(cf)
62
63        (md1, _, _) = read_vmware_imc(conf)
64        self.assertIn(instance_id_prefix, md1["instance-id"])
65        self.assertEqual(md1["instance-id"], 'iid-vmware-imc')
66
67        (md2, _, _) = read_vmware_imc(conf)
68        self.assertIn(instance_id_prefix, md2["instance-id"])
69        self.assertEqual(md2["instance-id"], 'iid-vmware-imc')
70
71        self.assertEqual(md2["instance-id"], md1["instance-id"])
72
73    def test_configfile_static_2nics(self):
74        """Tests Config class for a configuration with two static NICs."""
75        cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg")
76
77        conf = Config(cf)
78
79        self.assertEqual('myhost1', conf.host_name, "hostName")
80        self.assertEqual('Africa/Abidjan', conf.timezone, "tz")
81        self.assertTrue(conf.utc, "utc")
82
83        self.assertEqual(['10.20.145.1', '10.20.145.2'],
84                         conf.name_servers,
85                         "dns")
86        self.assertEqual(['eng.vmware.com', 'proxy.vmware.com'],
87                         conf.dns_suffixes,
88                         "suffixes")
89
90        nics = conf.nics
91        ipv40 = nics[0].staticIpv4
92
93        self.assertEqual(2, len(nics), "nics")
94        self.assertEqual('NIC1', nics[0].name, "nic0")
95        self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0")
96        self.assertEqual(BootProtoEnum.STATIC, nics[0].bootProto, "bootproto0")
97        self.assertEqual('10.20.87.154', ipv40[0].ip, "ipv4Addr0")
98        self.assertEqual('255.255.252.0', ipv40[0].netmask, "ipv4Mask0")
99        self.assertEqual(2, len(ipv40[0].gateways), "ipv4Gw0")
100        self.assertEqual('10.20.87.253', ipv40[0].gateways[0], "ipv4Gw0_0")
101        self.assertEqual('10.20.87.105', ipv40[0].gateways[1], "ipv4Gw0_1")
102
103        self.assertEqual(1, len(nics[0].staticIpv6), "ipv6Cnt0")
104        self.assertEqual('fc00:10:20:87::154',
105                         nics[0].staticIpv6[0].ip,
106                         "ipv6Addr0")
107
108        self.assertEqual('NIC2', nics[1].name, "nic1")
109        self.assertTrue(not nics[1].staticIpv6, "ipv61 dhcp")
110
111    def test_config_file_dhcp_2nics(self):
112        """Tests Config class for a configuration with two DHCP NICs."""
113        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
114
115        conf = Config(cf)
116        nics = conf.nics
117        self.assertEqual(2, len(nics), "nics")
118        self.assertEqual('NIC1', nics[0].name, "nic0")
119        self.assertEqual('00:50:56:a6:8c:08', nics[0].mac, "mac0")
120        self.assertEqual(BootProtoEnum.DHCP, nics[0].bootProto, "bootproto0")
121
122    def test_config_password(self):
123        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
124
125        cf._insertKey("PASSWORD|-PASS", "test-password")
126        cf._insertKey("PASSWORD|RESET", "no")
127
128        conf = Config(cf)
129        self.assertEqual('test-password', conf.admin_password, "password")
130        self.assertFalse(conf.reset_password, "do not reset password")
131
132    def test_config_reset_passwd(self):
133        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
134
135        cf._insertKey("PASSWORD|-PASS", "test-password")
136        cf._insertKey("PASSWORD|RESET", "random")
137
138        conf = Config(cf)
139        with self.assertRaises(ValueError):
140            pw = conf.reset_password
141            self.assertIsNone(pw)
142
143        cf.clear()
144        cf._insertKey("PASSWORD|RESET", "yes")
145        self.assertEqual(1, len(cf), "insert size")
146
147        conf = Config(cf)
148        self.assertTrue(conf.reset_password, "reset password")
149
150    def test_get_config_nameservers(self):
151        """Tests DNS and nameserver settings in a configuration."""
152        cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg")
153
154        config = Config(cf)
155
156        network_config = get_network_config_from_conf(config, False)
157
158        self.assertEqual(1, network_config.get('version'))
159
160        config_types = network_config.get('config')
161        name_servers = None
162        dns_suffixes = None
163
164        for type in config_types:
165            if type.get('type') == 'nameserver':
166                name_servers = type.get('address')
167                dns_suffixes = type.get('search')
168                break
169
170        self.assertEqual(['10.20.145.1', '10.20.145.2'],
171                         name_servers,
172                         "dns")
173        self.assertEqual(['eng.vmware.com', 'proxy.vmware.com'],
174                         dns_suffixes,
175                         "suffixes")
176
177    def test_gen_subnet(self):
178        """Tests if gen_subnet properly calculates network subnet from
179           IPv4 address and netmask"""
180        ip_subnet_list = [['10.20.87.253', '255.255.252.0', '10.20.84.0'],
181                          ['10.20.92.105', '255.255.252.0', '10.20.92.0'],
182                          ['192.168.0.10', '255.255.0.0', '192.168.0.0']]
183        for entry in ip_subnet_list:
184            self.assertEqual(entry[2], gen_subnet(entry[0], entry[1]),
185                             "Subnet for a specified ip and netmask")
186
187    def test_get_config_dns_suffixes(self):
188        """Tests if get_network_config_from_conf properly
189           generates nameservers and dns settings from a
190           specified configuration"""
191        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
192
193        config = Config(cf)
194
195        network_config = get_network_config_from_conf(config, False)
196
197        self.assertEqual(1, network_config.get('version'))
198
199        config_types = network_config.get('config')
200        name_servers = None
201        dns_suffixes = None
202
203        for type in config_types:
204            if type.get('type') == 'nameserver':
205                name_servers = type.get('address')
206                dns_suffixes = type.get('search')
207                break
208
209        self.assertEqual([],
210                         name_servers,
211                         "dns")
212        self.assertEqual(['eng.vmware.com'],
213                         dns_suffixes,
214                         "suffixes")
215
216    def test_get_nics_list_dhcp(self):
217        """Tests if NicConfigurator properly calculates network subnets
218           for a configuration with a list of DHCP NICs"""
219        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
220
221        config = Config(cf)
222
223        nicConfigurator = NicConfigurator(config.nics, False)
224        nics_cfg_list = nicConfigurator.generate()
225
226        self.assertEqual(2, len(nics_cfg_list), "number of config elements")
227
228        nic1 = {'name': 'NIC1'}
229        nic2 = {'name': 'NIC2'}
230        for cfg in nics_cfg_list:
231            if cfg.get('name') == nic1.get('name'):
232                nic1.update(cfg)
233            elif cfg.get('name') == nic2.get('name'):
234                nic2.update(cfg)
235
236        self.assertEqual('physical', nic1.get('type'), 'type of NIC1')
237        self.assertEqual('NIC1', nic1.get('name'), 'name of NIC1')
238        self.assertEqual('00:50:56:a6:8c:08', nic1.get('mac_address'),
239                         'mac address of NIC1')
240        subnets = nic1.get('subnets')
241        self.assertEqual(1, len(subnets), 'number of subnets for NIC1')
242        subnet = subnets[0]
243        self.assertEqual('dhcp', subnet.get('type'), 'DHCP type for NIC1')
244        self.assertEqual('auto', subnet.get('control'), 'NIC1 Control type')
245
246        self.assertEqual('physical', nic2.get('type'), 'type of NIC2')
247        self.assertEqual('NIC2', nic2.get('name'), 'name of NIC2')
248        self.assertEqual('00:50:56:a6:5a:de', nic2.get('mac_address'),
249                         'mac address of NIC2')
250        subnets = nic2.get('subnets')
251        self.assertEqual(1, len(subnets), 'number of subnets for NIC2')
252        subnet = subnets[0]
253        self.assertEqual('dhcp', subnet.get('type'), 'DHCP type for NIC2')
254        self.assertEqual('auto', subnet.get('control'), 'NIC2 Control type')
255
256    def test_get_nics_list_static(self):
257        """Tests if NicConfigurator properly calculates network subnets
258           for a configuration with 2 static NICs"""
259        cf = ConfigFile("tests/data/vmware/cust-static-2nic.cfg")
260
261        config = Config(cf)
262
263        nicConfigurator = NicConfigurator(config.nics, False)
264        nics_cfg_list = nicConfigurator.generate()
265
266        self.assertEqual(2, len(nics_cfg_list), "number of elements")
267
268        nic1 = {'name': 'NIC1'}
269        nic2 = {'name': 'NIC2'}
270        route_list = []
271        for cfg in nics_cfg_list:
272            cfg_type = cfg.get('type')
273            if cfg_type == 'physical':
274                if cfg.get('name') == nic1.get('name'):
275                    nic1.update(cfg)
276                elif cfg.get('name') == nic2.get('name'):
277                    nic2.update(cfg)
278
279        self.assertEqual('physical', nic1.get('type'), 'type of NIC1')
280        self.assertEqual('NIC1', nic1.get('name'), 'name of NIC1')
281        self.assertEqual('00:50:56:a6:8c:08', nic1.get('mac_address'),
282                         'mac address of NIC1')
283
284        subnets = nic1.get('subnets')
285        self.assertEqual(2, len(subnets), 'Number of subnets')
286
287        static_subnet = []
288        static6_subnet = []
289
290        for subnet in subnets:
291            subnet_type = subnet.get('type')
292            if subnet_type == 'static':
293                static_subnet.append(subnet)
294            elif subnet_type == 'static6':
295                static6_subnet.append(subnet)
296            else:
297                self.assertEqual(True, False, 'Unknown type')
298            if 'route' in subnet:
299                for route in subnet.get('routes'):
300                    route_list.append(route)
301
302        self.assertEqual(1, len(static_subnet), 'Number of static subnet')
303        self.assertEqual(1, len(static6_subnet), 'Number of static6 subnet')
304
305        subnet = static_subnet[0]
306        self.assertEqual('10.20.87.154', subnet.get('address'),
307                         'IPv4 address of static subnet')
308        self.assertEqual('255.255.252.0', subnet.get('netmask'),
309                         'NetMask of static subnet')
310        self.assertEqual('auto', subnet.get('control'),
311                         'control for static subnet')
312
313        subnet = static6_subnet[0]
314        self.assertEqual('fc00:10:20:87::154', subnet.get('address'),
315                         'IPv6 address of static subnet')
316        self.assertEqual('64', subnet.get('netmask'),
317                         'NetMask of static6 subnet')
318
319        route_set = set(['10.20.87.253', '10.20.87.105', '192.168.0.10'])
320        for route in route_list:
321            self.assertEqual(10000, route.get('metric'), 'metric of route')
322            gateway = route.get('gateway')
323            if gateway in route_set:
324                route_set.discard(gateway)
325            else:
326                self.assertEqual(True, False, 'invalid gateway %s' % (gateway))
327
328        self.assertEqual('physical', nic2.get('type'), 'type of NIC2')
329        self.assertEqual('NIC2', nic2.get('name'), 'name of NIC2')
330        self.assertEqual('00:50:56:a6:ef:7d', nic2.get('mac_address'),
331                         'mac address of NIC2')
332
333        subnets = nic2.get('subnets')
334        self.assertEqual(1, len(subnets), 'Number of subnets for NIC2')
335
336        subnet = subnets[0]
337        self.assertEqual('static', subnet.get('type'), 'Subnet type')
338        self.assertEqual('192.168.6.102', subnet.get('address'),
339                         'Subnet address')
340        self.assertEqual('255.255.0.0', subnet.get('netmask'),
341                         'Subnet netmask')
342
343    def test_custom_script(self):
344        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
345        conf = Config(cf)
346        self.assertIsNone(conf.custom_script_name)
347        cf._insertKey("CUSTOM-SCRIPT|SCRIPT-NAME", "test-script")
348        conf = Config(cf)
349        self.assertEqual("test-script", conf.custom_script_name)
350
351    def test_post_gc_status(self):
352        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
353        conf = Config(cf)
354        self.assertFalse(conf.post_gc_status)
355        cf._insertKey("MISC|POST-GC-STATUS", "YES")
356        conf = Config(cf)
357        self.assertTrue(conf.post_gc_status)
358
359    def test_no_default_run_post_script(self):
360        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
361        conf = Config(cf)
362        self.assertFalse(conf.default_run_post_script)
363        cf._insertKey("MISC|DEFAULT-RUN-POST-CUST-SCRIPT", "NO")
364        conf = Config(cf)
365        self.assertFalse(conf.default_run_post_script)
366
367    def test_yes_default_run_post_script(self):
368        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
369        cf._insertKey("MISC|DEFAULT-RUN-POST-CUST-SCRIPT", "yes")
370        conf = Config(cf)
371        self.assertTrue(conf.default_run_post_script)
372
373
374class TestVmwareNetConfig(CiTestCase):
375    """Test conversion of vmware config to cloud-init config."""
376
377    maxDiff = None
378
379    def _get_NicConfigurator(self, text):
380        fp = None
381        try:
382            with tempfile.NamedTemporaryFile(mode="w", dir=self.tmp_dir(),
383                                             delete=False) as fp:
384                fp.write(text)
385                fp.close()
386            cfg = Config(ConfigFile(fp.name))
387            return NicConfigurator(cfg.nics, use_system_devices=False)
388        finally:
389            if fp:
390                os.unlink(fp.name)
391
392    def test_non_primary_nic_without_gateway(self):
393        """A non primary nic set is not required to have a gateway."""
394        config = textwrap.dedent("""\
395            [NETWORK]
396            NETWORKING = yes
397            BOOTPROTO = dhcp
398            HOSTNAME = myhost1
399            DOMAINNAME = eng.vmware.com
400
401            [NIC-CONFIG]
402            NICS = NIC1
403
404            [NIC1]
405            MACADDR = 00:50:56:a6:8c:08
406            ONBOOT = yes
407            IPv4_MODE = BACKWARDS_COMPATIBLE
408            BOOTPROTO = static
409            IPADDR = 10.20.87.154
410            NETMASK = 255.255.252.0
411            """)
412        nc = self._get_NicConfigurator(config)
413        self.assertEqual(
414            [{'type': 'physical', 'name': 'NIC1',
415              'mac_address': '00:50:56:a6:8c:08',
416              'subnets': [
417                  {'control': 'auto', 'type': 'static',
418                   'address': '10.20.87.154', 'netmask': '255.255.252.0'}]}],
419            nc.generate())
420
421    def test_non_primary_nic_with_gateway(self):
422        """A non primary nic set can have a gateway."""
423        config = textwrap.dedent("""\
424            [NETWORK]
425            NETWORKING = yes
426            BOOTPROTO = dhcp
427            HOSTNAME = myhost1
428            DOMAINNAME = eng.vmware.com
429
430            [NIC-CONFIG]
431            NICS = NIC1
432
433            [NIC1]
434            MACADDR = 00:50:56:a6:8c:08
435            ONBOOT = yes
436            IPv4_MODE = BACKWARDS_COMPATIBLE
437            BOOTPROTO = static
438            IPADDR = 10.20.87.154
439            NETMASK = 255.255.252.0
440            GATEWAY = 10.20.87.253
441            """)
442        nc = self._get_NicConfigurator(config)
443        self.assertEqual(
444            [{'type': 'physical', 'name': 'NIC1',
445              'mac_address': '00:50:56:a6:8c:08',
446              'subnets': [
447                  {'control': 'auto', 'type': 'static',
448                   'address': '10.20.87.154', 'netmask': '255.255.252.0',
449                   'routes':
450                       [{'type': 'route', 'destination': '10.20.84.0/22',
451                         'gateway': '10.20.87.253', 'metric': 10000}]}]}],
452            nc.generate())
453
454    def test_cust_non_primary_nic_with_gateway_(self):
455        """A customer non primary nic set can have a gateway."""
456        config = textwrap.dedent("""\
457            [NETWORK]
458            NETWORKING = yes
459            BOOTPROTO = dhcp
460            HOSTNAME = static-debug-vm
461            DOMAINNAME = cluster.local
462
463            [NIC-CONFIG]
464            NICS = NIC1
465
466            [NIC1]
467            MACADDR = 00:50:56:ac:d1:8a
468            ONBOOT = yes
469            IPv4_MODE = BACKWARDS_COMPATIBLE
470            BOOTPROTO = static
471            IPADDR = 100.115.223.75
472            NETMASK = 255.255.255.0
473            GATEWAY = 100.115.223.254
474
475
476            [DNS]
477            DNSFROMDHCP=no
478
479            NAMESERVER|1 = 8.8.8.8
480
481            [DATETIME]
482            UTC = yes
483            """)
484        nc = self._get_NicConfigurator(config)
485        self.assertEqual(
486            [{'type': 'physical', 'name': 'NIC1',
487              'mac_address': '00:50:56:ac:d1:8a',
488              'subnets': [
489                  {'control': 'auto', 'type': 'static',
490                   'address': '100.115.223.75', 'netmask': '255.255.255.0',
491                   'routes':
492                       [{'type': 'route', 'destination': '100.115.223.0/24',
493                         'gateway': '100.115.223.254', 'metric': 10000}]}]}],
494            nc.generate())
495
496    def test_a_primary_nic_with_gateway(self):
497        """A primary nic set can have a gateway."""
498        config = textwrap.dedent("""\
499            [NETWORK]
500            NETWORKING = yes
501            BOOTPROTO = dhcp
502            HOSTNAME = myhost1
503            DOMAINNAME = eng.vmware.com
504
505            [NIC-CONFIG]
506            NICS = NIC1
507
508            [NIC1]
509            MACADDR = 00:50:56:a6:8c:08
510            ONBOOT = yes
511            IPv4_MODE = BACKWARDS_COMPATIBLE
512            BOOTPROTO = static
513            IPADDR = 10.20.87.154
514            NETMASK = 255.255.252.0
515            PRIMARY = true
516            GATEWAY = 10.20.87.253
517            """)
518        nc = self._get_NicConfigurator(config)
519        self.assertEqual(
520            [{'type': 'physical', 'name': 'NIC1',
521              'mac_address': '00:50:56:a6:8c:08',
522              'subnets': [
523                  {'control': 'auto', 'type': 'static',
524                   'address': '10.20.87.154', 'netmask': '255.255.252.0',
525                   'gateway': '10.20.87.253'}]}],
526            nc.generate())
527
528    def test_meta_data(self):
529        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
530        conf = Config(cf)
531        self.assertIsNone(conf.meta_data_name)
532        cf._insertKey("CLOUDINIT|METADATA", "test-metadata")
533        conf = Config(cf)
534        self.assertEqual("test-metadata", conf.meta_data_name)
535
536    def test_user_data(self):
537        cf = ConfigFile("tests/data/vmware/cust-dhcp-2nic.cfg")
538        conf = Config(cf)
539        self.assertIsNone(conf.user_data_name)
540        cf._insertKey("CLOUDINIT|USERDATA", "test-userdata")
541        conf = Config(cf)
542        self.assertEqual("test-userdata", conf.user_data_name)
543
544
545# vi: ts=4 expandtab
546