1# This file is part of cloud-init. See LICENSE file for license information.
2
3import copy
4import inspect
5import os
6import stat
7
8from cloudinit.event import EventScope, EventType
9from cloudinit.helpers import Paths
10from cloudinit import importer
11from cloudinit.sources import (
12    EXPERIMENTAL_TEXT, INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE,
13    METADATA_UNKNOWN, REDACT_SENSITIVE_VALUE, UNSET, DataSource,
14    canonical_cloud_id, redact_sensitive_keys)
15from cloudinit.tests.helpers import CiTestCase, mock
16from cloudinit.user_data import UserDataProcessor
17from cloudinit import util
18
19
20class DataSourceTestSubclassNet(DataSource):
21
22    dsname = 'MyTestSubclass'
23    url_max_wait = 55
24
25    def __init__(self, sys_cfg, distro, paths, custom_metadata=None,
26                 custom_userdata=None, get_data_retval=True):
27        super(DataSourceTestSubclassNet, self).__init__(
28            sys_cfg, distro, paths)
29        self._custom_userdata = custom_userdata
30        self._custom_metadata = custom_metadata
31        self._get_data_retval = get_data_retval
32
33    def _get_cloud_name(self):
34        return 'SubclassCloudName'
35
36    def _get_data(self):
37        if self._custom_metadata:
38            self.metadata = self._custom_metadata
39        else:
40            self.metadata = {'availability_zone': 'myaz',
41                             'local-hostname': 'test-subclass-hostname',
42                             'region': 'myregion'}
43        if self._custom_userdata:
44            self.userdata_raw = self._custom_userdata
45        else:
46            self.userdata_raw = 'userdata_raw'
47        self.vendordata_raw = 'vendordata_raw'
48        return self._get_data_retval
49
50
51class InvalidDataSourceTestSubclassNet(DataSource):
52    pass
53
54
55class TestDataSource(CiTestCase):
56
57    with_logs = True
58    maxDiff = None
59
60    def setUp(self):
61        super(TestDataSource, self).setUp()
62        self.sys_cfg = {'datasource': {'_undef': {'key1': False}}}
63        self.distro = 'distrotest'  # generally should be a Distro object
64        self.paths = Paths({})
65        self.datasource = DataSource(self.sys_cfg, self.distro, self.paths)
66
67    def test_datasource_init(self):
68        """DataSource initializes metadata attributes, ds_cfg and ud_proc."""
69        self.assertEqual(self.paths, self.datasource.paths)
70        self.assertEqual(self.sys_cfg, self.datasource.sys_cfg)
71        self.assertEqual(self.distro, self.datasource.distro)
72        self.assertIsNone(self.datasource.userdata)
73        self.assertEqual({}, self.datasource.metadata)
74        self.assertIsNone(self.datasource.userdata_raw)
75        self.assertIsNone(self.datasource.vendordata)
76        self.assertIsNone(self.datasource.vendordata_raw)
77        self.assertEqual({'key1': False}, self.datasource.ds_cfg)
78        self.assertIsInstance(self.datasource.ud_proc, UserDataProcessor)
79
80    def test_datasource_init_gets_ds_cfg_using_dsname(self):
81        """Init uses DataSource.dsname for sourcing ds_cfg."""
82        sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
83        distro = 'distrotest'  # generally should be a Distro object
84        datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
85        self.assertEqual({'key2': False}, datasource.ds_cfg)
86
87    def test_str_is_classname(self):
88        """The string representation of the datasource is the classname."""
89        self.assertEqual('DataSource', str(self.datasource))
90        self.assertEqual(
91            'DataSourceTestSubclassNet',
92            str(DataSourceTestSubclassNet('', '', self.paths)))
93
94    def test_datasource_get_url_params_defaults(self):
95        """get_url_params default url config settings for the datasource."""
96        params = self.datasource.get_url_params()
97        self.assertEqual(params.max_wait_seconds, self.datasource.url_max_wait)
98        self.assertEqual(params.timeout_seconds, self.datasource.url_timeout)
99        self.assertEqual(params.num_retries, self.datasource.url_retries)
100        self.assertEqual(params.sec_between_retries,
101                         self.datasource.url_sec_between_retries)
102
103    def test_datasource_get_url_params_subclassed(self):
104        """Subclasses can override get_url_params defaults."""
105        sys_cfg = {'datasource': {'MyTestSubclass': {'key2': False}}}
106        distro = 'distrotest'  # generally should be a Distro object
107        datasource = DataSourceTestSubclassNet(sys_cfg, distro, self.paths)
108        expected = (datasource.url_max_wait, datasource.url_timeout,
109                    datasource.url_retries, datasource.url_sec_between_retries)
110        url_params = datasource.get_url_params()
111        self.assertNotEqual(self.datasource.get_url_params(), url_params)
112        self.assertEqual(expected, url_params)
113
114    def test_datasource_get_url_params_ds_config_override(self):
115        """Datasource configuration options can override url param defaults."""
116        sys_cfg = {
117            'datasource': {
118                'MyTestSubclass': {
119                    'max_wait': '1', 'timeout': '2',
120                    'retries': '3', 'sec_between_retries': 4
121                }}}
122        datasource = DataSourceTestSubclassNet(
123            sys_cfg, self.distro, self.paths)
124        expected = (1, 2, 3, 4)
125        url_params = datasource.get_url_params()
126        self.assertNotEqual(
127            (datasource.url_max_wait, datasource.url_timeout,
128             datasource.url_retries, datasource.url_sec_between_retries),
129            url_params)
130        self.assertEqual(expected, url_params)
131
132    def test_datasource_get_url_params_is_zero_or_greater(self):
133        """get_url_params ignores timeouts with a value below 0."""
134        # Set an override that is below 0 which gets ignored.
135        sys_cfg = {'datasource': {'_undef': {'timeout': '-1'}}}
136        datasource = DataSource(sys_cfg, self.distro, self.paths)
137        (_max_wait, timeout, _retries,
138         _sec_between_retries) = datasource.get_url_params()
139        self.assertEqual(0, timeout)
140
141    def test_datasource_get_url_uses_defaults_on_errors(self):
142        """On invalid system config values for url_params defaults are used."""
143        # All invalid values should be logged
144        sys_cfg = {'datasource': {
145            '_undef': {
146                'max_wait': 'nope', 'timeout': 'bug', 'retries': 'nonint'}}}
147        datasource = DataSource(sys_cfg, self.distro, self.paths)
148        url_params = datasource.get_url_params()
149        expected = (datasource.url_max_wait, datasource.url_timeout,
150                    datasource.url_retries, datasource.url_sec_between_retries)
151        self.assertEqual(expected, url_params)
152        logs = self.logs.getvalue()
153        expected_logs = [
154            "Config max_wait 'nope' is not an int, using default '-1'",
155            "Config timeout 'bug' is not an int, using default '10'",
156            "Config retries 'nonint' is not an int, using default '5'",
157        ]
158        for log in expected_logs:
159            self.assertIn(log, logs)
160
161    @mock.patch('cloudinit.sources.net.find_fallback_nic')
162    def test_fallback_interface_is_discovered(self, m_get_fallback_nic):
163        """The fallback_interface is discovered via find_fallback_nic."""
164        m_get_fallback_nic.return_value = 'nic9'
165        self.assertEqual('nic9', self.datasource.fallback_interface)
166
167    @mock.patch('cloudinit.sources.net.find_fallback_nic')
168    def test_fallback_interface_logs_undiscovered(self, m_get_fallback_nic):
169        """Log a warning when fallback_interface can not discover the nic."""
170        self.datasource._cloud_name = 'MySupahCloud'
171        m_get_fallback_nic.return_value = None  # Couldn't discover nic
172        self.assertIsNone(self.datasource.fallback_interface)
173        self.assertEqual(
174            'WARNING: Did not find a fallback interface on MySupahCloud.\n',
175            self.logs.getvalue())
176
177    @mock.patch('cloudinit.sources.net.find_fallback_nic')
178    def test_wb_fallback_interface_is_cached(self, m_get_fallback_nic):
179        """The fallback_interface is cached and won't be rediscovered."""
180        self.datasource._fallback_interface = 'nic10'
181        self.assertEqual('nic10', self.datasource.fallback_interface)
182        m_get_fallback_nic.assert_not_called()
183
184    def test__get_data_unimplemented(self):
185        """Raise an error when _get_data is not implemented."""
186        with self.assertRaises(NotImplementedError) as context_manager:
187            self.datasource.get_data()
188        self.assertIn(
189            'Subclasses of DataSource must implement _get_data',
190            str(context_manager.exception))
191        datasource2 = InvalidDataSourceTestSubclassNet(
192            self.sys_cfg, self.distro, self.paths)
193        with self.assertRaises(NotImplementedError) as context_manager:
194            datasource2.get_data()
195        self.assertIn(
196            'Subclasses of DataSource must implement _get_data',
197            str(context_manager.exception))
198
199    def test_get_data_calls_subclass__get_data(self):
200        """Datasource.get_data uses the subclass' version of _get_data."""
201        tmp = self.tmp_dir()
202        datasource = DataSourceTestSubclassNet(
203            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
204        self.assertTrue(datasource.get_data())
205        self.assertEqual(
206            {'availability_zone': 'myaz',
207             'local-hostname': 'test-subclass-hostname',
208             'region': 'myregion'},
209            datasource.metadata)
210        self.assertEqual('userdata_raw', datasource.userdata_raw)
211        self.assertEqual('vendordata_raw', datasource.vendordata_raw)
212
213    def test_get_hostname_strips_local_hostname_without_domain(self):
214        """Datasource.get_hostname strips metadata local-hostname of domain."""
215        tmp = self.tmp_dir()
216        datasource = DataSourceTestSubclassNet(
217            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
218        self.assertTrue(datasource.get_data())
219        self.assertEqual(
220            'test-subclass-hostname', datasource.metadata['local-hostname'])
221        self.assertEqual('test-subclass-hostname', datasource.get_hostname())
222        datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
223        self.assertEqual('hostname', datasource.get_hostname())
224
225    def test_get_hostname_with_fqdn_returns_local_hostname_with_domain(self):
226        """Datasource.get_hostname with fqdn set gets qualified hostname."""
227        tmp = self.tmp_dir()
228        datasource = DataSourceTestSubclassNet(
229            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
230        self.assertTrue(datasource.get_data())
231        datasource.metadata['local-hostname'] = 'hostname.my.domain.com'
232        self.assertEqual(
233            'hostname.my.domain.com', datasource.get_hostname(fqdn=True))
234
235    def test_get_hostname_without_metadata_uses_system_hostname(self):
236        """Datasource.gethostname runs util.get_hostname when no metadata."""
237        tmp = self.tmp_dir()
238        datasource = DataSourceTestSubclassNet(
239            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
240        self.assertEqual({}, datasource.metadata)
241        mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
242        with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
243            with mock.patch(mock_fqdn) as m_fqdn:
244                m_gethost.return_value = 'systemhostname.domain.com'
245                m_fqdn.return_value = None  # No maching fqdn in /etc/hosts
246                self.assertEqual('systemhostname', datasource.get_hostname())
247                self.assertEqual(
248                    'systemhostname.domain.com',
249                    datasource.get_hostname(fqdn=True))
250
251    def test_get_hostname_without_metadata_returns_none(self):
252        """Datasource.gethostname returns None when metadata_only and no MD."""
253        tmp = self.tmp_dir()
254        datasource = DataSourceTestSubclassNet(
255            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
256        self.assertEqual({}, datasource.metadata)
257        mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
258        with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
259            with mock.patch(mock_fqdn) as m_fqdn:
260                self.assertIsNone(datasource.get_hostname(metadata_only=True))
261                self.assertIsNone(
262                    datasource.get_hostname(fqdn=True, metadata_only=True))
263        self.assertEqual([], m_gethost.call_args_list)
264        self.assertEqual([], m_fqdn.call_args_list)
265
266    def test_get_hostname_without_metadata_prefers_etc_hosts(self):
267        """Datasource.gethostname prefers /etc/hosts to util.get_hostname."""
268        tmp = self.tmp_dir()
269        datasource = DataSourceTestSubclassNet(
270            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
271        self.assertEqual({}, datasource.metadata)
272        mock_fqdn = 'cloudinit.sources.util.get_fqdn_from_hosts'
273        with mock.patch('cloudinit.sources.util.get_hostname') as m_gethost:
274            with mock.patch(mock_fqdn) as m_fqdn:
275                m_gethost.return_value = 'systemhostname.domain.com'
276                m_fqdn.return_value = 'fqdnhostname.domain.com'
277                self.assertEqual('fqdnhostname', datasource.get_hostname())
278                self.assertEqual('fqdnhostname.domain.com',
279                                 datasource.get_hostname(fqdn=True))
280
281    def test_get_data_does_not_write_instance_data_on_failure(self):
282        """get_data does not write INSTANCE_JSON_FILE on get_data False."""
283        tmp = self.tmp_dir()
284        datasource = DataSourceTestSubclassNet(
285            self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
286            get_data_retval=False)
287        self.assertFalse(datasource.get_data())
288        json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
289        self.assertFalse(
290            os.path.exists(json_file), 'Found unexpected file %s' % json_file)
291
292    def test_get_data_writes_json_instance_data_on_success(self):
293        """get_data writes INSTANCE_JSON_FILE to run_dir as world readable."""
294        tmp = self.tmp_dir()
295        datasource = DataSourceTestSubclassNet(
296            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
297        sys_info = {
298            "python": "3.7",
299            "platform":
300                "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
301            "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
302                      "x86_64"],
303            "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
304        with mock.patch("cloudinit.util.system_info", return_value=sys_info):
305            datasource.get_data()
306        json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
307        content = util.load_file(json_file)
308        expected = {
309            'base64_encoded_keys': [],
310            'merged_cfg': REDACT_SENSITIVE_VALUE,
311            'sensitive_keys': ['merged_cfg'],
312            'sys_info': sys_info,
313            'v1': {
314                '_beta_keys': ['subplatform'],
315                'availability-zone': 'myaz',
316                'availability_zone': 'myaz',
317                'cloud-name': 'subclasscloudname',
318                'cloud_name': 'subclasscloudname',
319                'distro': 'ubuntu',
320                'distro_release': 'focal',
321                'distro_version': '20.04',
322                'instance-id': 'iid-datasource',
323                'instance_id': 'iid-datasource',
324                'local-hostname': 'test-subclass-hostname',
325                'local_hostname': 'test-subclass-hostname',
326                'kernel_release': '5.4.0-24-generic',
327                'machine': 'x86_64',
328                'platform': 'mytestsubclass',
329                'public_ssh_keys': [],
330                'python_version': '3.7',
331                'region': 'myregion',
332                'system_platform':
333                    'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
334                'subplatform': 'unknown',
335                'variant': 'ubuntu'},
336            'ds': {
337
338                '_doc': EXPERIMENTAL_TEXT,
339                'meta_data': {'availability_zone': 'myaz',
340                              'local-hostname': 'test-subclass-hostname',
341                              'region': 'myregion'}}}
342        self.assertEqual(expected, util.load_json(content))
343        file_stat = os.stat(json_file)
344        self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
345        self.assertEqual(expected, util.load_json(content))
346
347    def test_get_data_writes_redacted_public_json_instance_data(self):
348        """get_data writes redacted content to public INSTANCE_JSON_FILE."""
349        tmp = self.tmp_dir()
350        datasource = DataSourceTestSubclassNet(
351            self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
352            custom_metadata={
353                'availability_zone': 'myaz',
354                'local-hostname': 'test-subclass-hostname',
355                'region': 'myregion',
356                'some': {'security-credentials': {
357                    'cred1': 'sekret', 'cred2': 'othersekret'}}})
358        self.assertCountEqual(
359            ('merged_cfg', 'security-credentials',),
360            datasource.sensitive_metadata_keys)
361        sys_info = {
362            "python": "3.7",
363            "platform":
364                "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
365            "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
366                      "x86_64"],
367            "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
368        with mock.patch("cloudinit.util.system_info", return_value=sys_info):
369            datasource.get_data()
370        json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
371        redacted = util.load_json(util.load_file(json_file))
372        expected = {
373            'base64_encoded_keys': [],
374            'merged_cfg': REDACT_SENSITIVE_VALUE,
375            'sensitive_keys': [
376                'ds/meta_data/some/security-credentials', 'merged_cfg'],
377            'sys_info': sys_info,
378            'v1': {
379                '_beta_keys': ['subplatform'],
380                'availability-zone': 'myaz',
381                'availability_zone': 'myaz',
382                'cloud-name': 'subclasscloudname',
383                'cloud_name': 'subclasscloudname',
384                'distro': 'ubuntu',
385                'distro_release': 'focal',
386                'distro_version': '20.04',
387                'instance-id': 'iid-datasource',
388                'instance_id': 'iid-datasource',
389                'local-hostname': 'test-subclass-hostname',
390                'local_hostname': 'test-subclass-hostname',
391                'kernel_release': '5.4.0-24-generic',
392                'machine': 'x86_64',
393                'platform': 'mytestsubclass',
394                'public_ssh_keys': [],
395                'python_version': '3.7',
396                'region': 'myregion',
397                'system_platform':
398                    'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
399                'subplatform': 'unknown',
400                'variant': 'ubuntu'},
401            'ds': {
402                '_doc': EXPERIMENTAL_TEXT,
403                'meta_data': {
404                    'availability_zone': 'myaz',
405                    'local-hostname': 'test-subclass-hostname',
406                    'region': 'myregion',
407                    'some': {'security-credentials': REDACT_SENSITIVE_VALUE}}}
408        }
409        self.assertCountEqual(expected, redacted)
410        file_stat = os.stat(json_file)
411        self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
412
413    def test_get_data_writes_json_instance_data_sensitive(self):
414        """
415        get_data writes unmodified data to sensitive file as root-readonly.
416        """
417        tmp = self.tmp_dir()
418        datasource = DataSourceTestSubclassNet(
419            self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
420            custom_metadata={
421                'availability_zone': 'myaz',
422                'local-hostname': 'test-subclass-hostname',
423                'region': 'myregion',
424                'some': {'security-credentials': {
425                    'cred1': 'sekret', 'cred2': 'othersekret'}}})
426        sys_info = {
427            "python": "3.7",
428            "platform":
429                "Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal",
430            "uname": ["Linux", "myhost", "5.4.0-24-generic", "SMP blah",
431                      "x86_64"],
432            "variant": "ubuntu", "dist": ["ubuntu", "20.04", "focal"]}
433
434        self.assertCountEqual(
435            ('merged_cfg', 'security-credentials',),
436            datasource.sensitive_metadata_keys)
437        with mock.patch("cloudinit.util.system_info", return_value=sys_info):
438            datasource.get_data()
439        sensitive_json_file = self.tmp_path(INSTANCE_JSON_SENSITIVE_FILE, tmp)
440        content = util.load_file(sensitive_json_file)
441        expected = {
442            'base64_encoded_keys': [],
443            'merged_cfg': {
444                '_doc': (
445                    'Merged cloud-init system config from '
446                    '/etc/cloud/cloud.cfg and /etc/cloud/cloud.cfg.d/'
447                ),
448                'datasource': {'_undef': {'key1': False}}},
449            'sensitive_keys': [
450                'ds/meta_data/some/security-credentials', 'merged_cfg'],
451            'sys_info': sys_info,
452            'v1': {
453                '_beta_keys': ['subplatform'],
454                'availability-zone': 'myaz',
455                'availability_zone': 'myaz',
456                'cloud-name': 'subclasscloudname',
457                'cloud_name': 'subclasscloudname',
458                'distro': 'ubuntu',
459                'distro_release': 'focal',
460                'distro_version': '20.04',
461                'instance-id': 'iid-datasource',
462                'instance_id': 'iid-datasource',
463                'kernel_release': '5.4.0-24-generic',
464                'local-hostname': 'test-subclass-hostname',
465                'local_hostname': 'test-subclass-hostname',
466                'machine': 'x86_64',
467                'platform': 'mytestsubclass',
468                'public_ssh_keys': [],
469                'python_version': '3.7',
470                'region': 'myregion',
471                'subplatform': 'unknown',
472                'system_platform':
473                    'Linux-5.4.0-24-generic-x86_64-with-Ubuntu-20.04-focal',
474                'variant': 'ubuntu'},
475            'ds': {
476                '_doc': EXPERIMENTAL_TEXT,
477                'meta_data': {
478                    'availability_zone': 'myaz',
479                    'local-hostname': 'test-subclass-hostname',
480                    'region': 'myregion',
481                    'some': {
482                        'security-credentials':
483                            {'cred1': 'sekret', 'cred2': 'othersekret'}}}}
484        }
485        self.assertCountEqual(expected, util.load_json(content))
486        file_stat = os.stat(sensitive_json_file)
487        self.assertEqual(0o600, stat.S_IMODE(file_stat.st_mode))
488        self.assertEqual(expected, util.load_json(content))
489
490    def test_get_data_handles_redacted_unserializable_content(self):
491        """get_data warns unserializable content in INSTANCE_JSON_FILE."""
492        tmp = self.tmp_dir()
493        datasource = DataSourceTestSubclassNet(
494            self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
495            custom_metadata={'key1': 'val1', 'key2': {'key2.1': self.paths}})
496        datasource.get_data()
497        json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
498        content = util.load_file(json_file)
499        expected_metadata = {
500            'key1': 'val1',
501            'key2': {
502                'key2.1': "Warning: redacted unserializable type <class"
503                          " 'cloudinit.helpers.Paths'>"}}
504        instance_json = util.load_json(content)
505        self.assertEqual(
506            expected_metadata, instance_json['ds']['meta_data'])
507
508    def test_persist_instance_data_writes_ec2_metadata_when_set(self):
509        """When ec2_metadata class attribute is set, persist to json."""
510        tmp = self.tmp_dir()
511        datasource = DataSourceTestSubclassNet(
512            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
513        datasource.ec2_metadata = UNSET
514        datasource.get_data()
515        json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
516        instance_data = util.load_json(util.load_file(json_file))
517        self.assertNotIn('ec2_metadata', instance_data['ds'])
518        datasource.ec2_metadata = {'ec2stuff': 'is good'}
519        datasource.persist_instance_data()
520        instance_data = util.load_json(util.load_file(json_file))
521        self.assertEqual(
522            {'ec2stuff': 'is good'},
523            instance_data['ds']['ec2_metadata'])
524
525    def test_persist_instance_data_writes_network_json_when_set(self):
526        """When network_data.json class attribute is set, persist to json."""
527        tmp = self.tmp_dir()
528        datasource = DataSourceTestSubclassNet(
529            self.sys_cfg, self.distro, Paths({'run_dir': tmp}))
530        datasource.get_data()
531        json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
532        instance_data = util.load_json(util.load_file(json_file))
533        self.assertNotIn('network_json', instance_data['ds'])
534        datasource.network_json = {'network_json': 'is good'}
535        datasource.persist_instance_data()
536        instance_data = util.load_json(util.load_file(json_file))
537        self.assertEqual(
538            {'network_json': 'is good'},
539            instance_data['ds']['network_json'])
540
541    def test_get_data_base64encodes_unserializable_bytes(self):
542        """On py3, get_data base64encodes any unserializable content."""
543        tmp = self.tmp_dir()
544        datasource = DataSourceTestSubclassNet(
545            self.sys_cfg, self.distro, Paths({'run_dir': tmp}),
546            custom_metadata={'key1': 'val1', 'key2': {'key2.1': b'\x123'}})
547        self.assertTrue(datasource.get_data())
548        json_file = self.tmp_path(INSTANCE_JSON_FILE, tmp)
549        content = util.load_file(json_file)
550        instance_json = util.load_json(content)
551        self.assertCountEqual(
552            ['ds/meta_data/key2/key2.1'],
553            instance_json['base64_encoded_keys'])
554        self.assertEqual(
555            {'key1': 'val1', 'key2': {'key2.1': 'EjM='}},
556            instance_json['ds']['meta_data'])
557
558    def test_get_hostname_subclass_support(self):
559        """Validate get_hostname signature on all subclasses of DataSource."""
560        base_args = inspect.getfullargspec(DataSource.get_hostname)
561        # Import all DataSource subclasses so we can inspect them.
562        modules = util.find_modules(os.path.dirname(os.path.dirname(__file__)))
563        for _loc, name in modules.items():
564            mod_locs, _ = importer.find_module(name, ['cloudinit.sources'], [])
565            if mod_locs:
566                importer.import_module(mod_locs[0])
567        for child in DataSource.__subclasses__():
568            if 'Test' in child.dsname:
569                continue
570            self.assertEqual(
571                base_args,
572                inspect.getfullargspec(child.get_hostname),
573                '%s does not implement DataSource.get_hostname params'
574                % child)
575            for grandchild in child.__subclasses__():
576                self.assertEqual(
577                    base_args,
578                    inspect.getfullargspec(grandchild.get_hostname),
579                    '%s does not implement DataSource.get_hostname params'
580                    % grandchild)
581
582    def test_clear_cached_attrs_resets_cached_attr_class_attributes(self):
583        """Class attributes listed in cached_attr_defaults are reset."""
584        count = 0
585        # Setup values for all cached class attributes
586        for attr, value in self.datasource.cached_attr_defaults:
587            setattr(self.datasource, attr, count)
588            count += 1
589        self.datasource._dirty_cache = True
590        self.datasource.clear_cached_attrs()
591        for attr, value in self.datasource.cached_attr_defaults:
592            self.assertEqual(value, getattr(self.datasource, attr))
593
594    def test_clear_cached_attrs_noops_on_clean_cache(self):
595        """Class attributes listed in cached_attr_defaults are reset."""
596        count = 0
597        # Setup values for all cached class attributes
598        for attr, _ in self.datasource.cached_attr_defaults:
599            setattr(self.datasource, attr, count)
600            count += 1
601        self.datasource._dirty_cache = False   # Fake clean cache
602        self.datasource.clear_cached_attrs()
603        count = 0
604        for attr, _ in self.datasource.cached_attr_defaults:
605            self.assertEqual(count, getattr(self.datasource, attr))
606            count += 1
607
608    def test_clear_cached_attrs_skips_non_attr_class_attributes(self):
609        """Skip any cached_attr_defaults which aren't class attributes."""
610        self.datasource._dirty_cache = True
611        self.datasource.clear_cached_attrs()
612        for attr in ('ec2_metadata', 'network_json'):
613            self.assertFalse(hasattr(self.datasource, attr))
614
615    def test_clear_cached_attrs_of_custom_attrs(self):
616        """Custom attr_values can be passed to clear_cached_attrs."""
617        self.datasource._dirty_cache = True
618        cached_attr_name = self.datasource.cached_attr_defaults[0][0]
619        setattr(self.datasource, cached_attr_name, 'himom')
620        self.datasource.myattr = 'orig'
621        self.datasource.clear_cached_attrs(
622            attr_defaults=(('myattr', 'updated'),))
623        self.assertEqual('himom', getattr(self.datasource, cached_attr_name))
624        self.assertEqual('updated', self.datasource.myattr)
625
626    @mock.patch.dict(DataSource.default_update_events, {
627        EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
628    @mock.patch.dict(DataSource.supported_update_events, {
629        EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
630    def test_update_metadata_only_acts_on_supported_update_events(self):
631        """update_metadata_if_supported wont get_data on unsupported events."""
632        self.assertEqual(
633            {EventScope.NETWORK: set([EventType.BOOT_NEW_INSTANCE])},
634            self.datasource.default_update_events
635        )
636
637        def fake_get_data():
638            raise Exception('get_data should not be called')
639
640        self.datasource.get_data = fake_get_data
641        self.assertFalse(
642            self.datasource.update_metadata_if_supported(
643                source_event_types=[EventType.BOOT]))
644
645    @mock.patch.dict(DataSource.supported_update_events, {
646        EventScope.NETWORK: {EventType.BOOT_NEW_INSTANCE}})
647    def test_update_metadata_returns_true_on_supported_update_event(self):
648        """update_metadata_if_supported returns get_data on supported events"""
649        def fake_get_data():
650            return True
651
652        self.datasource.get_data = fake_get_data
653        self.datasource._network_config = 'something'
654        self.datasource._dirty_cache = True
655        self.assertTrue(
656            self.datasource.update_metadata_if_supported(
657                source_event_types=[
658                    EventType.BOOT, EventType.BOOT_NEW_INSTANCE]))
659        self.assertEqual(UNSET, self.datasource._network_config)
660
661        self.assertIn(
662            "DEBUG: Update datasource metadata and network config due to"
663            " events: boot-new-instance",
664            self.logs.getvalue()
665        )
666
667
668class TestRedactSensitiveData(CiTestCase):
669
670    def test_redact_sensitive_data_noop_when_no_sensitive_keys_present(self):
671        """When sensitive_keys is absent or empty from metadata do nothing."""
672        md = {'my': 'data'}
673        self.assertEqual(
674            md, redact_sensitive_keys(md, redact_value='redacted'))
675        md['sensitive_keys'] = []
676        self.assertEqual(
677            md, redact_sensitive_keys(md, redact_value='redacted'))
678
679    def test_redact_sensitive_data_redacts_exact_match_name(self):
680        """Only exact matched sensitive_keys are redacted from metadata."""
681        md = {'sensitive_keys': ['md/secure'],
682              'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
683        secure_md = copy.deepcopy(md)
684        secure_md['md']['secure'] = 'redacted'
685        self.assertEqual(
686            secure_md,
687            redact_sensitive_keys(md, redact_value='redacted'))
688
689    def test_redact_sensitive_data_does_redacts_with_default_string(self):
690        """When redact_value is absent, REDACT_SENSITIVE_VALUE is used."""
691        md = {'sensitive_keys': ['md/secure'],
692              'md': {'secure': 's3kr1t', 'insecure': 'publik'}}
693        secure_md = copy.deepcopy(md)
694        secure_md['md']['secure'] = 'redacted for non-root user'
695        self.assertEqual(
696            secure_md,
697            redact_sensitive_keys(md))
698
699
700class TestCanonicalCloudID(CiTestCase):
701
702    def test_cloud_id_returns_platform_on_unknowns(self):
703        """When region and cloud_name are unknown, return platform."""
704        self.assertEqual(
705            'platform',
706            canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
707                               region=METADATA_UNKNOWN,
708                               platform='platform'))
709
710    def test_cloud_id_returns_platform_on_none(self):
711        """When region and cloud_name are unknown, return platform."""
712        self.assertEqual(
713            'platform',
714            canonical_cloud_id(cloud_name=None,
715                               region=None,
716                               platform='platform'))
717
718    def test_cloud_id_returns_cloud_name_on_unknown_region(self):
719        """When region is unknown, return cloud_name."""
720        for region in (None, METADATA_UNKNOWN):
721            self.assertEqual(
722                'cloudname',
723                canonical_cloud_id(cloud_name='cloudname',
724                                   region=region,
725                                   platform='platform'))
726
727    def test_cloud_id_returns_platform_on_unknown_cloud_name(self):
728        """When region is set but cloud_name is unknown return cloud_name."""
729        self.assertEqual(
730            'platform',
731            canonical_cloud_id(cloud_name=METADATA_UNKNOWN,
732                               region='region',
733                               platform='platform'))
734
735    def test_cloud_id_aws_based_on_region_and_cloud_name(self):
736        """When cloud_name is aws, return proper cloud-id based on region."""
737        self.assertEqual(
738            'aws-china',
739            canonical_cloud_id(cloud_name='aws',
740                               region='cn-north-1',
741                               platform='platform'))
742        self.assertEqual(
743            'aws',
744            canonical_cloud_id(cloud_name='aws',
745                               region='us-east-1',
746                               platform='platform'))
747        self.assertEqual(
748            'aws-gov',
749            canonical_cloud_id(cloud_name='aws',
750                               region='us-gov-1',
751                               platform='platform'))
752        self.assertEqual(  # Overrideen non-aws cloud_name is returned
753            '!aws',
754            canonical_cloud_id(cloud_name='!aws',
755                               region='us-gov-1',
756                               platform='platform'))
757
758    def test_cloud_id_azure_based_on_region_and_cloud_name(self):
759        """Report cloud-id when cloud_name is azure and region is in china."""
760        self.assertEqual(
761            'azure-china',
762            canonical_cloud_id(cloud_name='azure',
763                               region='chinaeast',
764                               platform='platform'))
765        self.assertEqual(
766            'azure',
767            canonical_cloud_id(cloud_name='azure',
768                               region='!chinaeast',
769                               platform='platform'))
770
771# vi: ts=4 expandtab
772