1# This file is part of cloud-init. See LICENSE file for license information.
2
3from cloudinit.helpers import Paths
4from cloudinit.sources import DataSourceIBMCloud as ibm
5from cloudinit.tests import helpers as test_helpers
6from cloudinit import util
7
8import base64
9import copy
10import json
11from textwrap import dedent
12
13mock = test_helpers.mock
14
15D_PATH = "cloudinit.sources.DataSourceIBMCloud."
16
17
18@mock.patch(D_PATH + "_is_xen", return_value=True)
19@mock.patch(D_PATH + "_is_ibm_provisioning")
20@mock.patch(D_PATH + "util.blkid")
21class TestGetIBMPlatform(test_helpers.CiTestCase):
22    """Test the get_ibm_platform helper."""
23
24    blkid_base = {
25        "/dev/xvda1": {
26            "DEVNAME": "/dev/xvda1", "LABEL": "cloudimg-bootfs",
27            "TYPE": "ext3"},
28        "/dev/xvda2": {
29            "DEVNAME": "/dev/xvda2", "LABEL": "cloudimg-rootfs",
30            "TYPE": "ext4"},
31    }
32
33    blkid_metadata_disk = {
34        "/dev/xvdh1": {
35            "DEVNAME": "/dev/xvdh1", "LABEL": "METADATA", "TYPE": "vfat",
36            "SEC_TYPE": "msdos", "UUID": "681B-8C5D",
37            "PARTUUID": "3d631e09-01"},
38    }
39
40    blkid_oscode_disk = {
41        "/dev/xvdh": {
42            "DEVNAME": "/dev/xvdh", "LABEL": "config-2", "TYPE": "vfat",
43            "SEC_TYPE": "msdos", "UUID": ibm.IBM_CONFIG_UUID}
44    }
45
46    def setUp(self):
47        self.blkid_metadata = copy.deepcopy(self.blkid_base)
48        self.blkid_metadata.update(copy.deepcopy(self.blkid_metadata_disk))
49
50        self.blkid_oscode = copy.deepcopy(self.blkid_base)
51        self.blkid_oscode.update(copy.deepcopy(self.blkid_oscode_disk))
52
53    def test_id_template_live_metadata(self, m_blkid, m_is_prov, _m_xen):
54        """identify TEMPLATE_LIVE_METADATA."""
55        m_blkid.return_value = self.blkid_metadata
56        m_is_prov.return_value = False
57        self.assertEqual(
58            (ibm.Platforms.TEMPLATE_LIVE_METADATA, "/dev/xvdh1"),
59            ibm.get_ibm_platform())
60
61    def test_id_template_prov_metadata(self, m_blkid, m_is_prov, _m_xen):
62        """identify TEMPLATE_PROVISIONING_METADATA."""
63        m_blkid.return_value = self.blkid_metadata
64        m_is_prov.return_value = True
65        self.assertEqual(
66            (ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh1"),
67            ibm.get_ibm_platform())
68
69    def test_id_template_prov_nodata(self, m_blkid, m_is_prov, _m_xen):
70        """identify TEMPLATE_PROVISIONING_NODATA."""
71        m_blkid.return_value = self.blkid_base
72        m_is_prov.return_value = True
73        self.assertEqual(
74            (ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None),
75            ibm.get_ibm_platform())
76
77    def test_id_os_code(self, m_blkid, m_is_prov, _m_xen):
78        """Identify OS_CODE."""
79        m_blkid.return_value = self.blkid_oscode
80        m_is_prov.return_value = False
81        self.assertEqual((ibm.Platforms.OS_CODE, "/dev/xvdh"),
82                         ibm.get_ibm_platform())
83
84    def test_id_os_code_must_match_uuid(self, m_blkid, m_is_prov, _m_xen):
85        """Test against false positive on openstack with non-ibm UUID."""
86        blkid = self.blkid_oscode
87        blkid["/dev/xvdh"]["UUID"] = "9999-9999"
88        m_blkid.return_value = blkid
89        m_is_prov.return_value = False
90        self.assertEqual((None, None), ibm.get_ibm_platform())
91
92
93@mock.patch(D_PATH + "_read_system_uuid", return_value=None)
94@mock.patch(D_PATH + "get_ibm_platform")
95class TestReadMD(test_helpers.CiTestCase):
96    """Test the read_datasource helper."""
97
98    template_md = {
99        "files": [],
100        "network_config": {"content_path": "/content/interfaces"},
101        "hostname": "ci-fond-ram",
102        "name": "ci-fond-ram",
103        "domain": "testing.ci.cloud-init.org",
104        "meta": {"dsmode": "net"},
105        "uuid": "8e636730-9f5d-c4a5-327c-d7123c46e82f",
106        "public_keys": {"1091307": "ssh-rsa AAAAB3NzaC1...Hw== ci-pubkey"},
107    }
108
109    oscode_md = {
110        "hostname": "ci-grand-gannet.testing.ci.cloud-init.org",
111        "name": "ci-grand-gannet",
112        "uuid": "2f266908-8e6c-4818-9b5c-42e9cc66a785",
113        "random_seed": "bm90LXJhbmRvbQo=",
114        "crypt_key": "ssh-rsa AAAAB3NzaC1yc2..n6z/",
115        "configuration_token": "eyJhbGciOi..M3ZA",
116        "public_keys": {"1091307": "ssh-rsa AAAAB3N..Hw== ci-pubkey"},
117    }
118
119    content_interfaces = dedent("""\
120        auto lo
121        iface lo inet loopback
122
123        auto eth0
124        allow-hotplug eth0
125        iface eth0 inet static
126        address 10.82.43.5
127        netmask 255.255.255.192
128        """)
129
130    userdata = b"#!/bin/sh\necho hi mom\n"
131    # meta.js file gets json encoded userdata as a list.
132    meta_js = '["#!/bin/sh\necho hi mom\n"]'
133    vendor_data = {
134        "cloud-init": "#!/bin/bash\necho 'root:$6$5ab01p1m1' | chpasswd -e"}
135
136    network_data = {
137        "links": [
138            {"id": "interface_29402281", "name": "eth0", "mtu": None,
139             "type": "phy", "ethernet_mac_address": "06:00:f1:bd:da:25"},
140            {"id": "interface_29402279", "name": "eth1", "mtu": None,
141             "type": "phy", "ethernet_mac_address": "06:98:5e:d0:7f:86"}
142        ],
143        "networks": [
144            {"id": "network_109887563", "link": "interface_29402281",
145             "type": "ipv4", "ip_address": "10.82.43.2",
146             "netmask": "255.255.255.192",
147             "routes": [
148                 {"network": "10.0.0.0", "netmask": "255.0.0.0",
149                  "gateway": "10.82.43.1"},
150                 {"network": "161.26.0.0", "netmask": "255.255.0.0",
151                  "gateway": "10.82.43.1"}]},
152            {"id": "network_109887551", "link": "interface_29402279",
153             "type": "ipv4", "ip_address": "108.168.194.252",
154             "netmask": "255.255.255.248",
155             "routes": [
156                 {"network": "0.0.0.0", "netmask": "0.0.0.0",
157                  "gateway": "108.168.194.249"}]}
158        ],
159        "services": [
160            {"type": "dns", "address": "10.0.80.11"},
161            {"type": "dns", "address": "10.0.80.12"}
162        ],
163    }
164
165    sysuuid = '7f79ebf5-d791-43c3-a723-854e8389d59f'
166
167    def _get_expected_metadata(self, os_md):
168        """return expected 'metadata' for data loaded from meta_data.json."""
169        os_md = copy.deepcopy(os_md)
170        renames = (
171            ('hostname', 'local-hostname'),
172            ('uuid', 'instance-id'),
173            ('public_keys', 'public-keys'))
174        ret = {}
175        for osname, mdname in renames:
176            if osname in os_md:
177                ret[mdname] = os_md[osname]
178        if 'random_seed' in os_md:
179            ret['random_seed'] = base64.b64decode(os_md['random_seed'])
180
181        return ret
182
183    def test_provisioning_md(self, m_platform, m_sysuuid):
184        """Provisioning env with a metadata disk should return None."""
185        m_platform.return_value = (
186            ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh")
187        self.assertIsNone(ibm.read_md())
188
189    def test_provisioning_no_metadata(self, m_platform, m_sysuuid):
190        """Provisioning env with no metadata disk should return None."""
191        m_platform.return_value = (
192            ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None)
193        self.assertIsNone(ibm.read_md())
194
195    def test_provisioning_not_ibm(self, m_platform, m_sysuuid):
196        """Provisioning env but not identified as IBM should return None."""
197        m_platform.return_value = (None, None)
198        self.assertIsNone(ibm.read_md())
199
200    def test_template_live(self, m_platform, m_sysuuid):
201        """Template live environment should be identified."""
202        tmpdir = self.tmp_dir()
203        m_platform.return_value = (
204            ibm.Platforms.TEMPLATE_LIVE_METADATA, tmpdir)
205        m_sysuuid.return_value = self.sysuuid
206
207        test_helpers.populate_dir(tmpdir, {
208            'openstack/latest/meta_data.json': json.dumps(self.template_md),
209            'openstack/latest/user_data': self.userdata,
210            'openstack/content/interfaces': self.content_interfaces,
211            'meta.js': self.meta_js})
212
213        ret = ibm.read_md()
214        self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA,
215                         ret['platform'])
216        self.assertEqual(tmpdir, ret['source'])
217        self.assertEqual(self.userdata, ret['userdata'])
218        self.assertEqual(self._get_expected_metadata(self.template_md),
219                         ret['metadata'])
220        self.assertEqual(self.sysuuid, ret['system-uuid'])
221
222    def test_os_code_live(self, m_platform, m_sysuuid):
223        """Verify an os_code metadata path."""
224        tmpdir = self.tmp_dir()
225        m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
226        netdata = json.dumps(self.network_data)
227        test_helpers.populate_dir(tmpdir, {
228            'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
229            'openstack/latest/user_data': self.userdata,
230            'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
231            'openstack/latest/network_data.json': netdata,
232        })
233
234        ret = ibm.read_md()
235        self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
236        self.assertEqual(tmpdir, ret['source'])
237        self.assertEqual(self.userdata, ret['userdata'])
238        self.assertEqual(self._get_expected_metadata(self.oscode_md),
239                         ret['metadata'])
240
241    def test_os_code_live_no_userdata(self, m_platform, m_sysuuid):
242        """Verify os_code without user-data."""
243        tmpdir = self.tmp_dir()
244        m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir)
245        test_helpers.populate_dir(tmpdir, {
246            'openstack/latest/meta_data.json': json.dumps(self.oscode_md),
247            'openstack/latest/vendor_data.json': json.dumps(self.vendor_data),
248        })
249
250        ret = ibm.read_md()
251        self.assertEqual(ibm.Platforms.OS_CODE, ret['platform'])
252        self.assertEqual(tmpdir, ret['source'])
253        self.assertIsNone(ret['userdata'])
254        self.assertEqual(self._get_expected_metadata(self.oscode_md),
255                         ret['metadata'])
256
257
258class TestIsIBMProvisioning(test_helpers.FilesystemMockingTestCase):
259    """Test the _is_ibm_provisioning method."""
260    inst_log = "/root/swinstall.log"
261    prov_cfg = "/root/provisioningConfiguration.cfg"
262    boot_ref = "/proc/1/environ"
263    with_logs = True
264
265    def _call_with_root(self, rootd):
266        self.reRoot(rootd)
267        return ibm._is_ibm_provisioning()
268
269    def test_no_config(self):
270        """No provisioning config means not provisioning."""
271        self.assertFalse(self._call_with_root(self.tmp_dir()))
272
273    def test_config_only(self):
274        """A provisioning config without a log means provisioning."""
275        rootd = self.tmp_dir()
276        test_helpers.populate_dir(rootd, {self.prov_cfg: "key=value"})
277        self.assertTrue(self._call_with_root(rootd))
278
279    def test_config_with_old_log(self):
280        """A config with a log from previous boot is not provisioning."""
281        rootd = self.tmp_dir()
282        data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
283                self.inst_log: ("log data\n", -30),
284                self.boot_ref: ("PWD=/", 0)}
285        test_helpers.populate_dir_with_ts(rootd, data)
286        self.assertFalse(self._call_with_root(rootd=rootd))
287        self.assertIn("from previous boot", self.logs.getvalue())
288
289    def test_config_with_new_log(self):
290        """A config with a log from this boot is provisioning."""
291        rootd = self.tmp_dir()
292        data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
293                self.inst_log: ("log data\n", 30),
294                self.boot_ref: ("PWD=/", 0)}
295        test_helpers.populate_dir_with_ts(rootd, data)
296        self.assertTrue(self._call_with_root(rootd=rootd))
297        self.assertIn("from current boot", self.logs.getvalue())
298
299    def test_config_and_log_no_reference(self):
300        """If the config and log existed, but no reference, assume not."""
301        rootd = self.tmp_dir()
302        test_helpers.populate_dir(
303            rootd, {self.prov_cfg: "key=value", self.inst_log: "log data\n"})
304        self.assertFalse(self._call_with_root(rootd=rootd))
305        self.assertIn("no reference file", self.logs.getvalue())
306
307
308class TestDataSourceIBMCloud(test_helpers.CiTestCase):
309
310    def setUp(self):
311        super(TestDataSourceIBMCloud, self).setUp()
312        self.tmp = self.tmp_dir()
313        self.cloud_dir = self.tmp_path('cloud', dir=self.tmp)
314        util.ensure_dir(self.cloud_dir)
315        paths = Paths({'run_dir': self.tmp, 'cloud_dir': self.cloud_dir})
316        self.ds = ibm.DataSourceIBMCloud(
317            sys_cfg={}, distro=None, paths=paths)
318
319    def test_get_data_false(self):
320        """When read_md returns None, get_data returns False."""
321        with mock.patch(D_PATH + 'read_md', return_value=None):
322            self.assertFalse(self.ds.get_data())
323
324    def test_get_data_processes_read_md(self):
325        """get_data processes and caches content returned by read_md."""
326        md = {
327            'metadata': {}, 'networkdata': 'net', 'platform': 'plat',
328            'source': 'src', 'system-uuid': 'uuid', 'userdata': 'ud',
329            'vendordata': 'vd'}
330        with mock.patch(D_PATH + 'read_md', return_value=md):
331            self.assertTrue(self.ds.get_data())
332        self.assertEqual('src', self.ds.source)
333        self.assertEqual('plat', self.ds.platform)
334        self.assertEqual({}, self.ds.metadata)
335        self.assertEqual('ud', self.ds.userdata_raw)
336        self.assertEqual('net', self.ds.network_json)
337        self.assertEqual('vd', self.ds.vendordata_pure)
338        self.assertEqual('uuid', self.ds.system_uuid)
339        self.assertEqual('ibmcloud', self.ds.cloud_name)
340        self.assertEqual('ibmcloud', self.ds.platform_type)
341        self.assertEqual('plat (src)', self.ds.subplatform)
342
343# vi: ts=4 expandtab
344