1#    Licensed under the Apache License, Version 2.0 (the "License"); you may
2#    not use this file except in compliance with the License. You may obtain
3#    a copy of the License at
4#
5#         http://www.apache.org/licenses/LICENSE-2.0
6# #    Unless required by applicable law or agreed to in writing, software
7#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
8#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
9#    License for the specific language governing permissions and limitations
10#    under the License.
11
12import operator
13from unittest import mock
14import warnings
15
16from oslo_config import cfg
17import stevedore
18import testtools
19import yaml
20
21from oslo_policy import generator
22from oslo_policy import policy
23from oslo_policy.tests import base
24from oslo_serialization import jsonutils
25
26
27OPTS = {'base_rules': [policy.RuleDefault('admin', 'is_admin:True',
28                                          description='Basic admin check'),
29                       policy.DocumentedRuleDefault('owner',
30                                                    ('project_id:%'
31                                                     '(project_id)s'),
32                                                    'This is a long '
33                                                    'description to check '
34                                                    'that line wrapping '
35                                                    'functions properly',
36                                                    [{'path': '/foo/',
37                                                      'method': 'GET'},
38                                                     {'path': '/test/',
39                                                      'method': 'POST'}])],
40        'custom_field': [policy.RuleDefault('shared',
41                                            'field:networks:shared=True')],
42        'rules': [policy.RuleDefault('admin_or_owner',
43                                     'rule:admin or rule:owner')],
44        }
45
46
47class GenerateSampleYAMLTestCase(base.PolicyBaseTestCase):
48    def setUp(self):
49        super(GenerateSampleYAMLTestCase, self).setUp()
50        self.enforcer = policy.Enforcer(self.conf, policy_file='policy.yaml')
51
52    def test_generate_loadable_yaml(self):
53        extensions = []
54        for name, opts in OPTS.items():
55            ext = stevedore.extension.Extension(name=name, entry_point=None,
56                                                plugin=None, obj=opts)
57            extensions.append(ext)
58        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
59            extensions=extensions, namespace=['base_rules', 'rules'])
60
61        output_file = self.get_config_file_fullname('policy.yaml')
62        with mock.patch('stevedore.named.NamedExtensionManager',
63                        return_value=test_mgr) as mock_ext_mgr:
64            # generate sample-policy file with only rules
65            generator._generate_sample(['base_rules', 'rules'], output_file,
66                                       include_help=False)
67            mock_ext_mgr.assert_called_once_with(
68                'oslo.policy.policies', names=['base_rules', 'rules'],
69                on_load_failure_callback=generator.on_load_failure_callback,
70                invoke_on_load=True)
71
72        self.enforcer.load_rules()
73
74        self.assertIn('owner', self.enforcer.rules)
75        self.assertIn('admin', self.enforcer.rules)
76        self.assertIn('admin_or_owner', self.enforcer.rules)
77        self.assertEqual('project_id:%(project_id)s',
78                         str(self.enforcer.rules['owner']))
79        self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
80        self.assertEqual('(rule:admin or rule:owner)',
81                         str(self.enforcer.rules['admin_or_owner']))
82
83    def test_expected_content(self):
84        extensions = []
85        for name, opts in OPTS.items():
86            ext = stevedore.extension.Extension(name=name, entry_point=None,
87                                                plugin=None, obj=opts)
88            extensions.append(ext)
89        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
90            extensions=extensions, namespace=['base_rules', 'rules'])
91
92        expected = '''# Basic admin check
93#"admin": "is_admin:True"
94
95# This is a long description to check that line wrapping functions
96# properly
97# GET  /foo/
98# POST  /test/
99#"owner": "project_id:%(project_id)s"
100
101#"shared": "field:networks:shared=True"
102
103#"admin_or_owner": "rule:admin or rule:owner"
104
105'''
106        output_file = self.get_config_file_fullname('policy.yaml')
107        with mock.patch('stevedore.named.NamedExtensionManager',
108                        return_value=test_mgr) as mock_ext_mgr:
109            generator._generate_sample(['base_rules', 'rules'], output_file)
110            mock_ext_mgr.assert_called_once_with(
111                'oslo.policy.policies', names=['base_rules', 'rules'],
112                on_load_failure_callback=generator.on_load_failure_callback,
113                invoke_on_load=True)
114
115        with open(output_file, 'r') as written_file:
116            written_policy = written_file.read()
117
118        self.assertEqual(expected, written_policy)
119
120    def test_expected_content_stdout(self):
121        extensions = []
122        for name, opts in OPTS.items():
123            ext = stevedore.extension.Extension(name=name, entry_point=None,
124                                                plugin=None, obj=opts)
125            extensions.append(ext)
126        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
127            extensions=extensions, namespace=['base_rules', 'rules'])
128
129        expected = '''# Basic admin check
130#"admin": "is_admin:True"
131
132# This is a long description to check that line wrapping functions
133# properly
134# GET  /foo/
135# POST  /test/
136#"owner": "project_id:%(project_id)s"
137
138#"shared": "field:networks:shared=True"
139
140#"admin_or_owner": "rule:admin or rule:owner"
141
142'''
143        stdout = self._capture_stdout()
144        with mock.patch('stevedore.named.NamedExtensionManager',
145                        return_value=test_mgr) as mock_ext_mgr:
146            generator._generate_sample(['base_rules', 'rules'],
147                                       output_file=None)
148            mock_ext_mgr.assert_called_once_with(
149                'oslo.policy.policies', names=['base_rules', 'rules'],
150                on_load_failure_callback=generator.on_load_failure_callback,
151                invoke_on_load=True)
152
153        self.assertEqual(expected, stdout.getvalue())
154
155    def test_policies_deprecated_for_removal(self):
156        rule = policy.RuleDefault(
157            name='foo:post_bar',
158            check_str='role:fizz',
159            description='Create a bar.',
160            deprecated_for_removal=True,
161            deprecated_reason='This policy is not used anymore',
162            deprecated_since='N'
163        )
164        opts = {'rules': [rule]}
165
166        extensions = []
167        for name, opts, in opts.items():
168            ext = stevedore.extension.Extension(name=name, entry_point=None,
169                                                plugin=None, obj=opts)
170            extensions.append(ext)
171
172        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
173            extensions=extensions, namespace=['rules']
174        )
175
176        expected = '''# DEPRECATED
177# "foo:post_bar" has been deprecated since N.
178# This policy is not used anymore
179# Create a bar.
180#"foo:post_bar": "role:fizz"
181
182'''
183        stdout = self._capture_stdout()
184        with mock.patch('stevedore.named.NamedExtensionManager',
185                        return_value=test_mgr) as mock_ext_mgr:
186            generator._generate_sample(['rules'], output_file=None)
187            mock_ext_mgr.assert_called_once_with(
188                'oslo.policy.policies', names=['rules'],
189                on_load_failure_callback=generator.on_load_failure_callback,
190                invoke_on_load=True
191            )
192        self.assertEqual(expected, stdout.getvalue())
193
194    def test_deprecated_policies_are_aliased_to_new_names(self):
195        deprecated_rule = policy.DeprecatedRule(
196            name='foo:post_bar',
197            check_str='role:fizz',
198            deprecated_reason=(
199                'foo:post_bar is being removed in favor of foo:create_bar'
200            ),
201            deprecated_since='N',
202        )
203        new_rule = policy.RuleDefault(
204            name='foo:create_bar',
205            check_str='role:fizz',
206            description='Create a bar.',
207            deprecated_rule=deprecated_rule,
208        )
209        opts = {'rules': [new_rule]}
210
211        extensions = []
212        for name, opts in opts.items():
213            ext = stevedore.extension.Extension(name=name, entry_point=None,
214                                                plugin=None, obj=opts)
215            extensions.append(ext)
216        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
217            extensions=extensions, namespace=['rules'])
218
219        expected = '''# Create a bar.
220#"foo:create_bar": "role:fizz"
221
222# DEPRECATED
223# "foo:post_bar":"role:fizz" has been deprecated since N in favor of
224# "foo:create_bar":"role:fizz".
225# foo:post_bar is being removed in favor of foo:create_bar
226"foo:post_bar": "rule:foo:create_bar"
227
228'''
229        stdout = self._capture_stdout()
230        with mock.patch('stevedore.named.NamedExtensionManager',
231                        return_value=test_mgr) as mock_ext_mgr:
232            generator._generate_sample(['rules'], output_file=None)
233            mock_ext_mgr.assert_called_once_with(
234                'oslo.policy.policies', names=['rules'],
235                on_load_failure_callback=generator.on_load_failure_callback,
236                invoke_on_load=True
237            )
238        self.assertEqual(expected, stdout.getvalue())
239
240    def test_deprecated_policies_with_same_name(self):
241        deprecated_rule = policy.DeprecatedRule(
242            name='foo:create_bar',
243            check_str='role:old',
244            deprecated_reason=(
245                'role:fizz is a more sane default for foo:create_bar'
246            ),
247            deprecated_since='N',
248        )
249        new_rule = policy.RuleDefault(
250            name='foo:create_bar',
251            check_str='role:fizz',
252            description='Create a bar.',
253            deprecated_rule=deprecated_rule,
254        )
255        opts = {'rules': [new_rule]}
256
257        extensions = []
258        for name, opts in opts.items():
259            ext = stevedore.extension.Extension(name=name, entry_point=None,
260                                                plugin=None, obj=opts)
261            extensions.append(ext)
262        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
263            extensions=extensions, namespace=['rules'])
264
265        expected = '''# Create a bar.
266#"foo:create_bar": "role:fizz"
267
268# DEPRECATED
269# "foo:create_bar":"role:old" has been deprecated since N in favor of
270# "foo:create_bar":"role:fizz".
271# role:fizz is a more sane default for foo:create_bar
272
273'''
274        stdout = self._capture_stdout()
275        with mock.patch('stevedore.named.NamedExtensionManager',
276                        return_value=test_mgr) as mock_ext_mgr:
277            generator._generate_sample(['rules'], output_file=None)
278            mock_ext_mgr.assert_called_once_with(
279                'oslo.policy.policies', names=['rules'],
280                on_load_failure_callback=generator.on_load_failure_callback,
281                invoke_on_load=True
282            )
283        self.assertEqual(expected, stdout.getvalue())
284
285    def _test_formatting(self, description, expected):
286        rule = [policy.RuleDefault('admin', 'is_admin:True',
287                                   description=description)]
288        ext = stevedore.extension.Extension(name='check_rule',
289                                            entry_point=None,
290                                            plugin=None, obj=rule)
291        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
292            extensions=[ext], namespace=['check_rule'])
293
294        output_file = self.get_config_file_fullname('policy.yaml')
295        with mock.patch('stevedore.named.NamedExtensionManager',
296                        return_value=test_mgr) as mock_ext_mgr:
297            generator._generate_sample(['check_rule'], output_file)
298            mock_ext_mgr.assert_called_once_with(
299                'oslo.policy.policies', names=['check_rule'],
300                on_load_failure_callback=generator.on_load_failure_callback,
301                invoke_on_load=True)
302
303        with open(output_file, 'r') as written_file:
304            written_policy = written_file.read()
305
306        self.assertEqual(expected, written_policy)
307
308    def test_empty_line_formatting(self):
309        description = ('Check Summary \n'
310                       '\n'
311                       'This is a description to '
312                       'check that empty line has '
313                       'no white spaces.')
314        expected = """# Check Summary
315#
316# This is a description to check that empty line has no white spaces.
317#"admin": "is_admin:True"
318
319"""
320
321        self._test_formatting(description, expected)
322
323    def test_paragraph_formatting(self):
324        description = """
325Here's a neat description with a paragraph. We want to make sure that it wraps
326properly.
327"""
328        expected = """# Here's a neat description with a paragraph. We want \
329to make sure
330# that it wraps properly.
331#"admin": "is_admin:True"
332
333"""
334
335        self._test_formatting(description, expected)
336
337    def test_literal_block_formatting(self):
338        description = """Here's another description.
339
340    This one has a literal block.
341    These lines should be kept apart.
342    They should not be wrapped, even though they may be longer than 70 chars
343"""
344        expected = """# Here's another description.
345#
346#     This one has a literal block.
347#     These lines should be kept apart.
348#     They should not be wrapped, even though they may be longer than 70 chars
349#"admin": "is_admin:True"
350
351"""
352
353        self._test_formatting(description, expected)
354
355    def test_invalid_formatting(self):
356        description = """Here's a broken description.
357
358We have some text...
359    Followed by a literal block without any spaces.
360    We don't support definition lists, so this is just wrong!
361"""
362        expected = """# Here's a broken description.
363#
364# We have some text...
365#
366#     Followed by a literal block without any spaces.
367#     We don't support definition lists, so this is just wrong!
368#"admin": "is_admin:True"
369
370"""
371
372        with warnings.catch_warnings(record=True) as warns:
373            self._test_formatting(description, expected)
374            self.assertEqual(1, len(warns))
375            self.assertTrue(issubclass(warns[-1].category, FutureWarning))
376            self.assertIn('Invalid policy description', str(warns[-1].message))
377
378
379class GenerateSampleJSONTestCase(base.PolicyBaseTestCase):
380    def setUp(self):
381        super(GenerateSampleJSONTestCase, self).setUp()
382        self.enforcer = policy.Enforcer(self.conf, policy_file='policy.json')
383
384    def test_generate_loadable_json(self):
385        extensions = []
386        for name, opts in OPTS.items():
387            ext = stevedore.extension.Extension(name=name, entry_point=None,
388                                                plugin=None, obj=opts)
389            extensions.append(ext)
390        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
391            extensions=extensions, namespace=['base_rules', 'rules'])
392
393        output_file = self.get_config_file_fullname('policy.json')
394        with mock.patch('stevedore.named.NamedExtensionManager',
395                        return_value=test_mgr) as mock_ext_mgr:
396            # generate sample-policy file with only rules
397            generator._generate_sample(['base_rules', 'rules'], output_file,
398                                       output_format='json',
399                                       include_help=False)
400            mock_ext_mgr.assert_called_once_with(
401                'oslo.policy.policies', names=['base_rules', 'rules'],
402                on_load_failure_callback=generator.on_load_failure_callback,
403                invoke_on_load=True)
404
405        self.enforcer.load_rules()
406
407        self.assertIn('owner', self.enforcer.rules)
408        self.assertIn('admin', self.enforcer.rules)
409        self.assertIn('admin_or_owner', self.enforcer.rules)
410        self.assertEqual('project_id:%(project_id)s',
411                         str(self.enforcer.rules['owner']))
412        self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
413        self.assertEqual('(rule:admin or rule:owner)',
414                         str(self.enforcer.rules['admin_or_owner']))
415
416    def test_expected_content(self):
417        extensions = []
418        for name, opts in OPTS.items():
419            ext = stevedore.extension.Extension(name=name, entry_point=None,
420                                                plugin=None, obj=opts)
421            extensions.append(ext)
422        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
423            extensions=extensions, namespace=['base_rules', 'rules'])
424
425        expected = '''{
426    "admin": "is_admin:True",
427    "owner": "project_id:%(project_id)s",
428    "shared": "field:networks:shared=True",
429    "admin_or_owner": "rule:admin or rule:owner"
430}
431'''
432        output_file = self.get_config_file_fullname('policy.json')
433        with mock.patch('stevedore.named.NamedExtensionManager',
434                        return_value=test_mgr) as mock_ext_mgr:
435            generator._generate_sample(['base_rules', 'rules'],
436                                       output_file=output_file,
437                                       output_format='json')
438            mock_ext_mgr.assert_called_once_with(
439                'oslo.policy.policies', names=['base_rules', 'rules'],
440                on_load_failure_callback=generator.on_load_failure_callback,
441                invoke_on_load=True)
442
443        with open(output_file, 'r') as written_file:
444            written_policy = written_file.read()
445
446        self.assertEqual(expected, written_policy)
447
448    def test_expected_content_stdout(self):
449        extensions = []
450        for name, opts in OPTS.items():
451            ext = stevedore.extension.Extension(name=name, entry_point=None,
452                                                plugin=None, obj=opts)
453            extensions.append(ext)
454        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
455            extensions=extensions, namespace=['base_rules', 'rules'])
456
457        expected = '''{
458    "admin": "is_admin:True",
459    "owner": "project_id:%(project_id)s",
460    "shared": "field:networks:shared=True",
461    "admin_or_owner": "rule:admin or rule:owner"
462}
463'''
464        stdout = self._capture_stdout()
465        with mock.patch('stevedore.named.NamedExtensionManager',
466                        return_value=test_mgr) as mock_ext_mgr:
467            generator._generate_sample(['base_rules', 'rules'],
468                                       output_file=None,
469                                       output_format='json')
470            mock_ext_mgr.assert_called_once_with(
471                'oslo.policy.policies', names=['base_rules', 'rules'],
472                on_load_failure_callback=generator.on_load_failure_callback,
473                invoke_on_load=True)
474
475        self.assertEqual(expected, stdout.getvalue())
476
477    @mock.patch.object(generator, 'LOG')
478    def test_generate_json_file_log_warning(self, mock_log):
479        extensions = []
480        for name, opts in OPTS.items():
481            ext = stevedore.extension.Extension(name=name, entry_point=None,
482                                                plugin=None, obj=opts)
483            extensions.append(ext)
484        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
485            extensions=extensions, namespace=['base_rules', 'rules'])
486
487        output_file = self.get_config_file_fullname('policy.json')
488        with mock.patch('stevedore.named.NamedExtensionManager',
489                        return_value=test_mgr):
490            generator._generate_sample(['base_rules', 'rules'], output_file,
491                                       output_format='json')
492            mock_log.warning.assert_any_call(policy.WARN_JSON)
493
494
495class GeneratorRaiseErrorTestCase(testtools.TestCase):
496    def test_generator_raises_error(self):
497        """Verifies that errors from extension manager are not suppressed."""
498        class FakeException(Exception):
499            pass
500
501        class FakeEP(object):
502
503            def __init__(self):
504                self.name = 'callback_is_expected'
505                self.require = self.resolve
506                self.load = self.resolve
507
508            def resolve(self, *args, **kwargs):
509                raise FakeException()
510
511        fake_ep = FakeEP()
512        with mock.patch('stevedore.named.NamedExtensionManager',
513                        side_effect=FakeException()):
514            self.assertRaises(FakeException, generator._generate_sample,
515                              fake_ep.name)
516
517    def test_generator_call_with_no_arguments_raises_error(self):
518        testargs = ['oslopolicy-sample-generator']
519        with mock.patch('sys.argv', testargs):
520            local_conf = cfg.ConfigOpts()
521            self.assertRaises(cfg.RequiredOptError, generator.generate_sample,
522                              [], local_conf)
523
524
525class GeneratePolicyTestCase(base.PolicyBaseTestCase):
526    def setUp(self):
527        super(GeneratePolicyTestCase, self).setUp()
528
529    def test_merged_rules(self):
530        extensions = []
531        for name, opts in OPTS.items():
532            ext = stevedore.extension.Extension(name=name, entry_point=None,
533                                                plugin=None, obj=opts)
534            extensions.append(ext)
535        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
536            extensions=extensions, namespace=['base_rules', 'rules'])
537
538        # Write the policy file for an enforcer to load
539        sample_file = self.get_config_file_fullname('policy-sample.yaml')
540        with mock.patch('stevedore.named.NamedExtensionManager',
541                        return_value=test_mgr):
542            # generate sample-policy file with only rules
543            generator._generate_sample(['base_rules', 'rules'], sample_file,
544                                       include_help=False)
545
546        enforcer = policy.Enforcer(self.conf, policy_file='policy-sample.yaml')
547        # register an opt defined in the file
548        enforcer.register_default(policy.RuleDefault('admin',
549                                                     'is_admin:False'))
550        # register a new opt
551        enforcer.register_default(policy.RuleDefault('foo', 'role:foo'))
552
553        # Mock out stevedore to return the configured enforcer
554        ext = stevedore.extension.Extension(name='testing', entry_point=None,
555                                            plugin=None, obj=enforcer)
556        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
557            extensions=[ext], namespace='testing')
558
559        # Generate a merged file
560        merged_file = self.get_config_file_fullname('policy-merged.yaml')
561        with mock.patch('stevedore.named.NamedExtensionManager',
562                        return_value=test_mgr) as mock_ext_mgr:
563            generator._generate_policy(namespace='testing',
564                                       output_file=merged_file)
565            mock_ext_mgr.assert_called_once_with(
566                'oslo.policy.enforcer', names=['testing'],
567                on_load_failure_callback=generator.on_load_failure_callback,
568                invoke_on_load=True)
569
570        # load the merged file with a new enforcer
571        merged_enforcer = policy.Enforcer(self.conf,
572                                          policy_file='policy-merged.yaml')
573        merged_enforcer.load_rules()
574        for rule in ['admin', 'owner', 'admin_or_owner', 'foo']:
575            self.assertIn(rule, merged_enforcer.rules)
576
577        self.assertEqual('is_admin:True', str(merged_enforcer.rules['admin']))
578        self.assertEqual('role:foo', str(merged_enforcer.rules['foo']))
579
580
581class ListRedundantTestCase(base.PolicyBaseTestCase):
582    def setUp(self):
583        super(ListRedundantTestCase, self).setUp()
584
585    @mock.patch('warnings.warn')
586    def test_matched_rules(self, mock_warn):
587        extensions = []
588        for name, opts in OPTS.items():
589            ext = stevedore.extension.Extension(name=name, entry_point=None,
590                                                plugin=None, obj=opts)
591            extensions.append(ext)
592        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
593            extensions=extensions, namespace=['base_rules', 'rules'])
594
595        # Write the policy file for an enforcer to load
596        sample_file = self.get_config_file_fullname('policy-sample.yaml')
597        with mock.patch('stevedore.named.NamedExtensionManager',
598                        return_value=test_mgr):
599            # generate sample-policy file with only rules
600            generator._generate_sample(['base_rules', 'rules'], sample_file,
601                                       include_help=False)
602
603        enforcer = policy.Enforcer(self.conf, policy_file='policy-sample.yaml')
604        # register opts that match those defined in policy-sample.yaml
605        enforcer.register_default(policy.RuleDefault('admin', 'is_admin:True'))
606        enforcer.register_default(
607            policy.RuleDefault('owner', 'project_id:%(project_id)s'))
608        # register a new opt
609        deprecated_rule = policy.DeprecatedRule(
610            name='old_foo',
611            check_str='role:bar',
612            deprecated_reason='reason',
613            deprecated_since='T'
614        )
615        enforcer.register_default(
616            policy.RuleDefault(
617                name='foo',
618                check_str='role:foo',
619                deprecated_rule=deprecated_rule,
620            ),
621        )
622
623        # Mock out stevedore to return the configured enforcer
624        ext = stevedore.extension.Extension(name='testing', entry_point=None,
625                                            plugin=None, obj=enforcer)
626        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
627            extensions=[ext], namespace='testing')
628
629        stdout = self._capture_stdout()
630        with mock.patch('stevedore.named.NamedExtensionManager',
631                        return_value=test_mgr) as mock_ext_mgr:
632            generator._list_redundant(namespace='testing')
633            mock_ext_mgr.assert_called_once_with(
634                'oslo.policy.enforcer', names=['testing'],
635                on_load_failure_callback=generator.on_load_failure_callback,
636                invoke_on_load=True)
637
638        matches = [line.split(': ', 1) for
639                   line in stdout.getvalue().splitlines()]
640        matches.sort(key=operator.itemgetter(0))
641
642        # Should be 'admin'
643        opt0 = matches[0]
644        self.assertEqual('"admin"', opt0[0])
645        self.assertEqual('"is_admin:True"', opt0[1])
646
647        # Should be 'owner'
648        opt1 = matches[1]
649        self.assertEqual('"owner"', opt1[0])
650        self.assertEqual('"project_id:%(project_id)s"', opt1[1])
651
652        self.assertFalse(mock_warn.called,
653                         'Deprecation warnings not suppressed.')
654
655
656class UpgradePolicyTestCase(base.PolicyBaseTestCase):
657    def setUp(self):
658        super(UpgradePolicyTestCase, self).setUp()
659        policy_json_contents = jsonutils.dumps({
660            "deprecated_name": "rule:admin"
661        })
662        self.create_config_file('policy.json', policy_json_contents)
663        deprecated_policy = policy.DeprecatedRule(
664            name='deprecated_name',
665            check_str='rule:admin',
666            deprecated_reason='test',
667            deprecated_since='Stein',
668        )
669        self.new_policy = policy.DocumentedRuleDefault(
670            name='new_policy_name',
671            check_str='rule:admin',
672            description='test_policy',
673            operations=[{'path': '/test', 'method': 'GET'}],
674            deprecated_rule=deprecated_policy,
675        )
676        self.extensions = []
677        ext = stevedore.extension.Extension(name='test_upgrade',
678                                            entry_point=None,
679                                            plugin=None,
680                                            obj=[self.new_policy])
681        self.extensions.append(ext)
682        # Just used for cli opt parsing
683        self.local_conf = cfg.ConfigOpts()
684
685    def test_upgrade_policy_json_file(self):
686        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
687            extensions=self.extensions, namespace='test_upgrade')
688        with mock.patch('stevedore.named.NamedExtensionManager',
689                        return_value=test_mgr):
690            testargs = ['olsopolicy-policy-upgrade',
691                        '--policy',
692                        self.get_config_file_fullname('policy.json'),
693                        '--namespace', 'test_upgrade',
694                        '--output-file',
695                        self.get_config_file_fullname('new_policy.json'),
696                        '--format', 'json']
697            with mock.patch('sys.argv', testargs):
698                generator.upgrade_policy(conf=self.local_conf)
699                new_file = self.get_config_file_fullname('new_policy.json')
700                with open(new_file, 'r') as fh:
701                    new_policy = jsonutils.loads(fh.read())
702                self.assertIsNotNone(new_policy.get('new_policy_name'))
703                self.assertIsNone(new_policy.get('deprecated_name'))
704
705    @mock.patch.object(generator, 'LOG')
706    def test_upgrade_policy_json_file_log_warning(self, mock_log):
707        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
708            extensions=self.extensions, namespace='test_upgrade')
709        with mock.patch('stevedore.named.NamedExtensionManager',
710                        return_value=test_mgr):
711            testargs = ['olsopolicy-policy-upgrade',
712                        '--policy',
713                        self.get_config_file_fullname('policy.json'),
714                        '--namespace', 'test_upgrade',
715                        '--output-file',
716                        self.get_config_file_fullname('new_policy.json'),
717                        '--format', 'json']
718            with mock.patch('sys.argv', testargs):
719                generator.upgrade_policy(conf=self.local_conf)
720                mock_log.warning.assert_any_call(policy.WARN_JSON)
721
722    def test_upgrade_policy_yaml_file(self):
723        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
724            extensions=self.extensions, namespace='test_upgrade')
725        with mock.patch('stevedore.named.NamedExtensionManager',
726                        return_value=test_mgr):
727            testargs = ['olsopolicy-policy-upgrade',
728                        '--policy',
729                        self.get_config_file_fullname('policy.json'),
730                        '--namespace', 'test_upgrade',
731                        '--output-file',
732                        self.get_config_file_fullname('new_policy.yaml'),
733                        '--format', 'yaml']
734            with mock.patch('sys.argv', testargs):
735                generator.upgrade_policy(conf=self.local_conf)
736                new_file = self.get_config_file_fullname('new_policy.yaml')
737                with open(new_file, 'r') as fh:
738                    new_policy = yaml.safe_load(fh)
739                self.assertIsNotNone(new_policy.get('new_policy_name'))
740                self.assertIsNone(new_policy.get('deprecated_name'))
741
742    def test_upgrade_policy_json_stdout(self):
743        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
744            extensions=self.extensions, namespace='test_upgrade')
745        stdout = self._capture_stdout()
746        with mock.patch('stevedore.named.NamedExtensionManager',
747                        return_value=test_mgr):
748            testargs = ['olsopolicy-policy-upgrade',
749                        '--policy',
750                        self.get_config_file_fullname('policy.json'),
751                        '--namespace', 'test_upgrade',
752                        '--format', 'json']
753            with mock.patch('sys.argv', testargs):
754                generator.upgrade_policy(conf=self.local_conf)
755                expected = '''{
756    "new_policy_name": "rule:admin"
757}'''
758                self.assertEqual(expected, stdout.getvalue())
759
760    def test_upgrade_policy_yaml_stdout(self):
761        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
762            extensions=self.extensions, namespace='test_upgrade')
763        stdout = self._capture_stdout()
764        with mock.patch('stevedore.named.NamedExtensionManager',
765                        return_value=test_mgr):
766            testargs = ['olsopolicy-policy-upgrade',
767                        '--policy',
768                        self.get_config_file_fullname('policy.json'),
769                        '--namespace', 'test_upgrade',
770                        '--format', 'yaml']
771            with mock.patch('sys.argv', testargs):
772                generator.upgrade_policy(conf=self.local_conf)
773                expected = '''new_policy_name: rule:admin
774'''
775                self.assertEqual(expected, stdout.getvalue())
776
777
778@mock.patch('stevedore.named.NamedExtensionManager')
779class GetEnforcerTestCase(base.PolicyBaseTestCase):
780    def test_get_enforcer(self, mock_manager):
781        mock_instance = mock.MagicMock()
782        mock_instance.__contains__.return_value = True
783        mock_manager.return_value = mock_instance
784        mock_item = mock.Mock()
785        mock_item.obj = 'test'
786        mock_instance.__getitem__.return_value = mock_item
787        self.assertEqual('test', generator._get_enforcer('foo'))
788
789    def test_get_enforcer_missing(self, mock_manager):
790        mock_instance = mock.MagicMock()
791        mock_instance.__contains__.return_value = False
792        mock_manager.return_value = mock_instance
793        self.assertRaises(KeyError, generator._get_enforcer, 'nonexistent')
794
795
796class ValidatorTestCase(base.PolicyBaseTestCase):
797    def _get_test_enforcer(self):
798        test_rules = [policy.RuleDefault('foo', 'foo:bar=baz'),
799                      policy.RuleDefault('bar', 'bar:foo=baz')]
800        enforcer = policy.Enforcer(self.conf)
801        enforcer.register_defaults(test_rules)
802        return enforcer
803
804    def _test_policy(self, rule, success=False, missing_file=False):
805        policy_file = self.get_config_file_fullname('test.yaml')
806        if missing_file:
807            policy_file = 'bogus.yaml'
808        self.create_config_file('test.yaml', rule)
809        self.create_config_file('test.conf',
810                                '[oslo_policy]\npolicy_file=%s' % policy_file)
811        # Reparse now that we've created our configs
812        self.conf(args=['--config-dir', self.config_dir])
813
814        with mock.patch('oslo_policy.generator._get_enforcer') as ge:
815            ge.return_value = self._get_test_enforcer()
816            result = generator._validate_policy('test')
817            if success:
818                self.assertEqual(0, result)
819            else:
820                self.assertEqual(1, result)
821
822    def test_success(self):
823        self._test_policy('foo: rule:bar', success=True)
824
825    def test_cyclical_reference(self):
826        self._test_policy('foo: rule:bar\nbar: rule:foo')
827
828    def test_invalid_syntax(self):
829        self._test_policy('foo: (bar))')
830
831    def test_false_okay(self):
832        self._test_policy('foo: !', success=True)
833
834    def test_reference_nonexistent(self):
835        self._test_policy('foo: rule:baz')
836
837    def test_nonexistent(self):
838        self._test_policy('baz: rule:foo')
839
840    def test_missing_policy_file(self):
841        self._test_policy('', missing_file=True)
842
843
844class ConvertJsonToYamlTestCase(base.PolicyBaseTestCase):
845    def setUp(self):
846        super(ConvertJsonToYamlTestCase, self).setUp()
847        policy_json_contents = jsonutils.dumps({
848            "rule1_name": "rule:admin",
849            "rule2_name": "rule:overridden",
850            "deprecated_rule1_name": "rule:admin"
851        })
852        self.create_config_file('policy.json', policy_json_contents)
853        self.output_file_path = self.get_config_file_fullname(
854            'converted_policy.yaml')
855        deprecated_policy = policy.DeprecatedRule(
856            name='deprecated_rule1_name',
857            check_str='rule:admin',
858            deprecated_reason='testing',
859            deprecated_since='ussuri',
860        )
861        self.registered_policy = [
862            policy.DocumentedRuleDefault(
863                name='rule1_name',
864                check_str='rule:admin',
865                description='test_rule1',
866                operations=[{'path': '/test', 'method': 'GET'}],
867                deprecated_rule=deprecated_policy,
868                scope_types=['system'],
869            ),
870            policy.RuleDefault(
871                name='rule2_name',
872                check_str='rule:admin',
873            )
874        ]
875        self.extensions = []
876        ext = stevedore.extension.Extension(name='test',
877                                            entry_point=None,
878                                            plugin=None,
879                                            obj=self.registered_policy)
880        self.extensions.append(ext)
881        # Just used for cli opt parsing
882        self.local_conf = cfg.ConfigOpts()
883
884        self.expected = '''# test_rule1
885# GET  /test
886# Intended scope(s): system
887#"rule1_name": "rule:admin"
888
889# rule2_name
890"rule2_name": "rule:overridden"
891
892# WARNING: Below rules are either deprecated rules
893# or extra rules in policy file, it is strongly
894# recommended to switch to new rules.
895"deprecated_rule1_name": "rule:admin"
896'''
897
898    def _is_yaml(self, data):
899        is_yaml = False
900        try:
901            jsonutils.loads(data)
902        except ValueError:
903            try:
904                yaml.safe_load(data)
905                is_yaml = True
906            except yaml.scanner.ScannerError:
907                pass
908        return is_yaml
909
910    def _test_convert_json_to_yaml_file(self, output_to_file=True):
911        test_mgr = stevedore.named.NamedExtensionManager.make_test_instance(
912            extensions=self.extensions, namespace='test')
913        converted_policy_data = None
914        with mock.patch('stevedore.named.NamedExtensionManager',
915                        return_value=test_mgr):
916            testargs = ['oslopolicy-convert-json-to-yaml',
917                        '--namespace', 'test',
918                        '--policy-file',
919                        self.get_config_file_fullname('policy.json')]
920            if output_to_file:
921                testargs.extend(['--output-file',
922                                 self.output_file_path])
923            with mock.patch('sys.argv', testargs):
924                generator.convert_policy_json_to_yaml(conf=self.local_conf)
925                if output_to_file:
926                    with open(self.output_file_path, 'r') as fh:
927                        converted_policy_data = fh.read()
928        return converted_policy_data
929
930    def test_convert_json_to_yaml_file(self):
931        converted_policy_data = self._test_convert_json_to_yaml_file()
932        self.assertTrue(self._is_yaml(converted_policy_data))
933        self.assertEqual(self.expected, converted_policy_data)
934
935    def test_convert_policy_to_stdout(self):
936        stdout = self._capture_stdout()
937        self._test_convert_json_to_yaml_file(output_to_file=False)
938        self.assertEqual(self.expected, stdout.getvalue())
939
940    def test_converted_yaml_is_loadable(self):
941        self._test_convert_json_to_yaml_file()
942        enforcer = policy.Enforcer(self.conf,
943                                   policy_file=self.output_file_path)
944        enforcer.load_rules()
945        for rule in ['rule2_name', 'deprecated_rule1_name']:
946            self.assertIn(rule, enforcer.rules)
947
948    def test_default_rules_comment_out_in_yaml_file(self):
949        converted_policy_data = self._test_convert_json_to_yaml_file()
950        commented_default_rule = '''# test_rule1
951# GET  /test
952# Intended scope(s): system
953#"rule1_name": "rule:admin"
954
955'''
956        self.assertIn(commented_default_rule, converted_policy_data)
957
958    def test_overridden_rules_uncommented_in_yaml_file(self):
959        converted_policy_data = self._test_convert_json_to_yaml_file()
960        uncommented_overridden_rule = '''# rule2_name
961"rule2_name": "rule:overridden"
962
963'''
964        self.assertIn(uncommented_overridden_rule, converted_policy_data)
965
966    def test_existing_deprecated_rules_kept_uncommented_in_yaml_file(self):
967        converted_policy_data = self._test_convert_json_to_yaml_file()
968        existing_deprecated_rule_with_warning = '''# WARNING: Below rules are either deprecated rules
969# or extra rules in policy file, it is strongly
970# recommended to switch to new rules.
971"deprecated_rule1_name": "rule:admin"
972'''
973        self.assertIn(existing_deprecated_rule_with_warning,
974                      converted_policy_data)
975