1#    Licensed under the Apache License, Version 2.0 (the "License"); you may
2#    not use this file except in compliance with the License. You may obtain
3#    a copy of the License at
4#
5#         http://www.apache.org/licenses/LICENSE-2.0
6#
7#    Unless required by applicable law or agreed to in writing, software
8#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10#    License for the specific language governing permissions and limitations
11#    under the License.
12
13"""
14Functional test cases for the Castellan Oslo Config Driver.
15
16Note: This requires local running instance of Vault.
17"""
18import tempfile
19
20from oslo_config import cfg
21from oslo_config import fixture
22
23from oslotest import base
24
25from castellan import _config_driver
26from castellan.common.objects import opaque_data
27from castellan.tests.unit.key_manager import fake
28
29
30class CastellanSourceTestCase(base.BaseTestCase):
31
32    def setUp(self):
33        super(CastellanSourceTestCase, self).setUp()
34        self.driver = _config_driver.CastellanConfigurationSourceDriver()
35        self.conf = cfg.ConfigOpts()
36        self.conf_fixture = self.useFixture(fixture.Config(self.conf))
37
38    def test_incomplete_driver(self):
39        # The group exists, but does not specify the
40        # required options for this driver.
41        self.conf_fixture.load_raw_values(
42            group='incomplete_driver',
43            driver='castellan',
44        )
45        source = self.conf._open_source_from_opt_group('incomplete_driver')
46
47        self.assertIsNone(source)
48        self.assertEqual(self.conf.incomplete_driver.driver, 'castellan')
49
50    def test_complete_driver(self):
51        self.conf_fixture.load_raw_values(
52            group='castellan_source',
53            driver='castellan',
54            config_file='config.conf',
55            mapping_file='mapping.conf',
56        )
57
58        with base.mock.patch.object(
59                _config_driver,
60                'CastellanConfigurationSource') as source_class:
61            self.driver.open_source_from_opt_group(
62                self.conf, 'castellan_source')
63
64            source_class.assert_called_once_with(
65                'castellan_source',
66                self.conf.castellan_source.config_file,
67                self.conf.castellan_source.mapping_file)
68
69    def test_fetch_secret(self):
70        # fake KeyManager populated with secret
71        km = fake.fake_api()
72        secret_id = km.store("fake_context",
73                             opaque_data.OpaqueData(b"super_secret!"))
74
75        # driver config
76        config = "[key_manager]\nbackend=vault"
77        mapping = "[DEFAULT]\nmy_secret=" + secret_id
78
79        # creating temp files
80        with tempfile.NamedTemporaryFile() as config_file:
81            config_file.write(config.encode("utf-8"))
82            config_file.flush()
83
84            with tempfile.NamedTemporaryFile() as mapping_file:
85                mapping_file.write(mapping.encode("utf-8"))
86                mapping_file.flush()
87
88                self.conf_fixture.load_raw_values(
89                    group='castellan_source',
90                    driver='castellan',
91                    config_file=config_file.name,
92                    mapping_file=mapping_file.name,
93                )
94
95                source = self.driver.open_source_from_opt_group(
96                    self.conf,
97                    'castellan_source')
98
99                # replacing key_manager with fake one
100                source._mngr = km
101
102                # testing if the source is able to retrieve
103                # the secret value stored in the key_manager
104                # using the secret_id from the mapping file
105                self.assertEqual("super_secret!",
106                                 source.get("DEFAULT",
107                                            "my_secret",
108                                            cfg.StrOpt(""))[0])
109