1# Licensed under the Apache License, Version 2.0 (the "License"); you may 2# not use this file except in compliance with the License. You may obtain 3# a copy of the License at 4# 5# http://www.apache.org/licenses/LICENSE-2.0 6# # Unless required by applicable law or agreed to in writing, software 7# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 8# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 9# License for the specific language governing permissions and limitations 10# under the License. 11 12import operator 13from unittest import mock 14import warnings 15 16from oslo_config import cfg 17import stevedore 18import testtools 19import yaml 20 21from oslo_policy import generator 22from oslo_policy import policy 23from oslo_policy.tests import base 24from oslo_serialization import jsonutils 25 26 27OPTS = {'base_rules': [policy.RuleDefault('admin', 'is_admin:True', 28 description='Basic admin check'), 29 policy.DocumentedRuleDefault('owner', 30 ('project_id:%' 31 '(project_id)s'), 32 'This is a long ' 33 'description to check ' 34 'that line wrapping ' 35 'functions properly', 36 [{'path': '/foo/', 37 'method': 'GET'}, 38 {'path': '/test/', 39 'method': 'POST'}])], 40 'custom_field': [policy.RuleDefault('shared', 41 'field:networks:shared=True')], 42 'rules': [policy.RuleDefault('admin_or_owner', 43 'rule:admin or rule:owner')], 44 } 45 46 47class GenerateSampleYAMLTestCase(base.PolicyBaseTestCase): 48 def setUp(self): 49 super(GenerateSampleYAMLTestCase, self).setUp() 50 self.enforcer = policy.Enforcer(self.conf, policy_file='policy.yaml') 51 52 def test_generate_loadable_yaml(self): 53 extensions = [] 54 for name, opts in OPTS.items(): 55 ext = stevedore.extension.Extension(name=name, entry_point=None, 56 plugin=None, obj=opts) 57 extensions.append(ext) 58 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 59 extensions=extensions, namespace=['base_rules', 'rules']) 60 61 output_file = self.get_config_file_fullname('policy.yaml') 62 with mock.patch('stevedore.named.NamedExtensionManager', 63 return_value=test_mgr) as mock_ext_mgr: 64 # generate sample-policy file with only rules 65 generator._generate_sample(['base_rules', 'rules'], output_file, 66 include_help=False) 67 mock_ext_mgr.assert_called_once_with( 68 'oslo.policy.policies', names=['base_rules', 'rules'], 69 on_load_failure_callback=generator.on_load_failure_callback, 70 invoke_on_load=True) 71 72 self.enforcer.load_rules() 73 74 self.assertIn('owner', self.enforcer.rules) 75 self.assertIn('admin', self.enforcer.rules) 76 self.assertIn('admin_or_owner', self.enforcer.rules) 77 self.assertEqual('project_id:%(project_id)s', 78 str(self.enforcer.rules['owner'])) 79 self.assertEqual('is_admin:True', str(self.enforcer.rules['admin'])) 80 self.assertEqual('(rule:admin or rule:owner)', 81 str(self.enforcer.rules['admin_or_owner'])) 82 83 def test_expected_content(self): 84 extensions = [] 85 for name, opts in OPTS.items(): 86 ext = stevedore.extension.Extension(name=name, entry_point=None, 87 plugin=None, obj=opts) 88 extensions.append(ext) 89 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 90 extensions=extensions, namespace=['base_rules', 'rules']) 91 92 expected = '''# Basic admin check 93#"admin": "is_admin:True" 94 95# This is a long description to check that line wrapping functions 96# properly 97# GET /foo/ 98# POST /test/ 99#"owner": "project_id:%(project_id)s" 100 101#"shared": "field:networks:shared=True" 102 103#"admin_or_owner": "rule:admin or rule:owner" 104 105''' 106 output_file = self.get_config_file_fullname('policy.yaml') 107 with mock.patch('stevedore.named.NamedExtensionManager', 108 return_value=test_mgr) as mock_ext_mgr: 109 generator._generate_sample(['base_rules', 'rules'], output_file) 110 mock_ext_mgr.assert_called_once_with( 111 'oslo.policy.policies', names=['base_rules', 'rules'], 112 on_load_failure_callback=generator.on_load_failure_callback, 113 invoke_on_load=True) 114 115 with open(output_file, 'r') as written_file: 116 written_policy = written_file.read() 117 118 self.assertEqual(expected, written_policy) 119 120 def test_expected_content_stdout(self): 121 extensions = [] 122 for name, opts in OPTS.items(): 123 ext = stevedore.extension.Extension(name=name, entry_point=None, 124 plugin=None, obj=opts) 125 extensions.append(ext) 126 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 127 extensions=extensions, namespace=['base_rules', 'rules']) 128 129 expected = '''# Basic admin check 130#"admin": "is_admin:True" 131 132# This is a long description to check that line wrapping functions 133# properly 134# GET /foo/ 135# POST /test/ 136#"owner": "project_id:%(project_id)s" 137 138#"shared": "field:networks:shared=True" 139 140#"admin_or_owner": "rule:admin or rule:owner" 141 142''' 143 stdout = self._capture_stdout() 144 with mock.patch('stevedore.named.NamedExtensionManager', 145 return_value=test_mgr) as mock_ext_mgr: 146 generator._generate_sample(['base_rules', 'rules'], 147 output_file=None) 148 mock_ext_mgr.assert_called_once_with( 149 'oslo.policy.policies', names=['base_rules', 'rules'], 150 on_load_failure_callback=generator.on_load_failure_callback, 151 invoke_on_load=True) 152 153 self.assertEqual(expected, stdout.getvalue()) 154 155 def test_policies_deprecated_for_removal(self): 156 rule = policy.RuleDefault( 157 name='foo:post_bar', 158 check_str='role:fizz', 159 description='Create a bar.', 160 deprecated_for_removal=True, 161 deprecated_reason='This policy is not used anymore', 162 deprecated_since='N' 163 ) 164 opts = {'rules': [rule]} 165 166 extensions = [] 167 for name, opts, in opts.items(): 168 ext = stevedore.extension.Extension(name=name, entry_point=None, 169 plugin=None, obj=opts) 170 extensions.append(ext) 171 172 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 173 extensions=extensions, namespace=['rules'] 174 ) 175 176 expected = '''# DEPRECATED 177# "foo:post_bar" has been deprecated since N. 178# This policy is not used anymore 179# Create a bar. 180#"foo:post_bar": "role:fizz" 181 182''' 183 stdout = self._capture_stdout() 184 with mock.patch('stevedore.named.NamedExtensionManager', 185 return_value=test_mgr) as mock_ext_mgr: 186 generator._generate_sample(['rules'], output_file=None) 187 mock_ext_mgr.assert_called_once_with( 188 'oslo.policy.policies', names=['rules'], 189 on_load_failure_callback=generator.on_load_failure_callback, 190 invoke_on_load=True 191 ) 192 self.assertEqual(expected, stdout.getvalue()) 193 194 def test_deprecated_policies_are_aliased_to_new_names(self): 195 deprecated_rule = policy.DeprecatedRule( 196 name='foo:post_bar', 197 check_str='role:fizz', 198 deprecated_reason=( 199 'foo:post_bar is being removed in favor of foo:create_bar' 200 ), 201 deprecated_since='N', 202 ) 203 new_rule = policy.RuleDefault( 204 name='foo:create_bar', 205 check_str='role:fizz', 206 description='Create a bar.', 207 deprecated_rule=deprecated_rule, 208 ) 209 opts = {'rules': [new_rule]} 210 211 extensions = [] 212 for name, opts in opts.items(): 213 ext = stevedore.extension.Extension(name=name, entry_point=None, 214 plugin=None, obj=opts) 215 extensions.append(ext) 216 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 217 extensions=extensions, namespace=['rules']) 218 219 expected = '''# Create a bar. 220#"foo:create_bar": "role:fizz" 221 222# DEPRECATED 223# "foo:post_bar":"role:fizz" has been deprecated since N in favor of 224# "foo:create_bar":"role:fizz". 225# foo:post_bar is being removed in favor of foo:create_bar 226"foo:post_bar": "rule:foo:create_bar" 227 228''' 229 stdout = self._capture_stdout() 230 with mock.patch('stevedore.named.NamedExtensionManager', 231 return_value=test_mgr) as mock_ext_mgr: 232 generator._generate_sample(['rules'], output_file=None) 233 mock_ext_mgr.assert_called_once_with( 234 'oslo.policy.policies', names=['rules'], 235 on_load_failure_callback=generator.on_load_failure_callback, 236 invoke_on_load=True 237 ) 238 self.assertEqual(expected, stdout.getvalue()) 239 240 def test_deprecated_policies_with_same_name(self): 241 deprecated_rule = policy.DeprecatedRule( 242 name='foo:create_bar', 243 check_str='role:old', 244 deprecated_reason=( 245 'role:fizz is a more sane default for foo:create_bar' 246 ), 247 deprecated_since='N', 248 ) 249 new_rule = policy.RuleDefault( 250 name='foo:create_bar', 251 check_str='role:fizz', 252 description='Create a bar.', 253 deprecated_rule=deprecated_rule, 254 ) 255 opts = {'rules': [new_rule]} 256 257 extensions = [] 258 for name, opts in opts.items(): 259 ext = stevedore.extension.Extension(name=name, entry_point=None, 260 plugin=None, obj=opts) 261 extensions.append(ext) 262 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 263 extensions=extensions, namespace=['rules']) 264 265 expected = '''# Create a bar. 266#"foo:create_bar": "role:fizz" 267 268# DEPRECATED 269# "foo:create_bar":"role:old" has been deprecated since N in favor of 270# "foo:create_bar":"role:fizz". 271# role:fizz is a more sane default for foo:create_bar 272 273''' 274 stdout = self._capture_stdout() 275 with mock.patch('stevedore.named.NamedExtensionManager', 276 return_value=test_mgr) as mock_ext_mgr: 277 generator._generate_sample(['rules'], output_file=None) 278 mock_ext_mgr.assert_called_once_with( 279 'oslo.policy.policies', names=['rules'], 280 on_load_failure_callback=generator.on_load_failure_callback, 281 invoke_on_load=True 282 ) 283 self.assertEqual(expected, stdout.getvalue()) 284 285 def _test_formatting(self, description, expected): 286 rule = [policy.RuleDefault('admin', 'is_admin:True', 287 description=description)] 288 ext = stevedore.extension.Extension(name='check_rule', 289 entry_point=None, 290 plugin=None, obj=rule) 291 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 292 extensions=[ext], namespace=['check_rule']) 293 294 output_file = self.get_config_file_fullname('policy.yaml') 295 with mock.patch('stevedore.named.NamedExtensionManager', 296 return_value=test_mgr) as mock_ext_mgr: 297 generator._generate_sample(['check_rule'], output_file) 298 mock_ext_mgr.assert_called_once_with( 299 'oslo.policy.policies', names=['check_rule'], 300 on_load_failure_callback=generator.on_load_failure_callback, 301 invoke_on_load=True) 302 303 with open(output_file, 'r') as written_file: 304 written_policy = written_file.read() 305 306 self.assertEqual(expected, written_policy) 307 308 def test_empty_line_formatting(self): 309 description = ('Check Summary \n' 310 '\n' 311 'This is a description to ' 312 'check that empty line has ' 313 'no white spaces.') 314 expected = """# Check Summary 315# 316# This is a description to check that empty line has no white spaces. 317#"admin": "is_admin:True" 318 319""" 320 321 self._test_formatting(description, expected) 322 323 def test_paragraph_formatting(self): 324 description = """ 325Here's a neat description with a paragraph. We want to make sure that it wraps 326properly. 327""" 328 expected = """# Here's a neat description with a paragraph. We want \ 329to make sure 330# that it wraps properly. 331#"admin": "is_admin:True" 332 333""" 334 335 self._test_formatting(description, expected) 336 337 def test_literal_block_formatting(self): 338 description = """Here's another description. 339 340 This one has a literal block. 341 These lines should be kept apart. 342 They should not be wrapped, even though they may be longer than 70 chars 343""" 344 expected = """# Here's another description. 345# 346# This one has a literal block. 347# These lines should be kept apart. 348# They should not be wrapped, even though they may be longer than 70 chars 349#"admin": "is_admin:True" 350 351""" 352 353 self._test_formatting(description, expected) 354 355 def test_invalid_formatting(self): 356 description = """Here's a broken description. 357 358We have some text... 359 Followed by a literal block without any spaces. 360 We don't support definition lists, so this is just wrong! 361""" 362 expected = """# Here's a broken description. 363# 364# We have some text... 365# 366# Followed by a literal block without any spaces. 367# We don't support definition lists, so this is just wrong! 368#"admin": "is_admin:True" 369 370""" 371 372 with warnings.catch_warnings(record=True) as warns: 373 self._test_formatting(description, expected) 374 self.assertEqual(1, len(warns)) 375 self.assertTrue(issubclass(warns[-1].category, FutureWarning)) 376 self.assertIn('Invalid policy description', str(warns[-1].message)) 377 378 379class GenerateSampleJSONTestCase(base.PolicyBaseTestCase): 380 def setUp(self): 381 super(GenerateSampleJSONTestCase, self).setUp() 382 self.enforcer = policy.Enforcer(self.conf, policy_file='policy.json') 383 384 def test_generate_loadable_json(self): 385 extensions = [] 386 for name, opts in OPTS.items(): 387 ext = stevedore.extension.Extension(name=name, entry_point=None, 388 plugin=None, obj=opts) 389 extensions.append(ext) 390 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 391 extensions=extensions, namespace=['base_rules', 'rules']) 392 393 output_file = self.get_config_file_fullname('policy.json') 394 with mock.patch('stevedore.named.NamedExtensionManager', 395 return_value=test_mgr) as mock_ext_mgr: 396 # generate sample-policy file with only rules 397 generator._generate_sample(['base_rules', 'rules'], output_file, 398 output_format='json', 399 include_help=False) 400 mock_ext_mgr.assert_called_once_with( 401 'oslo.policy.policies', names=['base_rules', 'rules'], 402 on_load_failure_callback=generator.on_load_failure_callback, 403 invoke_on_load=True) 404 405 self.enforcer.load_rules() 406 407 self.assertIn('owner', self.enforcer.rules) 408 self.assertIn('admin', self.enforcer.rules) 409 self.assertIn('admin_or_owner', self.enforcer.rules) 410 self.assertEqual('project_id:%(project_id)s', 411 str(self.enforcer.rules['owner'])) 412 self.assertEqual('is_admin:True', str(self.enforcer.rules['admin'])) 413 self.assertEqual('(rule:admin or rule:owner)', 414 str(self.enforcer.rules['admin_or_owner'])) 415 416 def test_expected_content(self): 417 extensions = [] 418 for name, opts in OPTS.items(): 419 ext = stevedore.extension.Extension(name=name, entry_point=None, 420 plugin=None, obj=opts) 421 extensions.append(ext) 422 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 423 extensions=extensions, namespace=['base_rules', 'rules']) 424 425 expected = '''{ 426 "admin": "is_admin:True", 427 "owner": "project_id:%(project_id)s", 428 "shared": "field:networks:shared=True", 429 "admin_or_owner": "rule:admin or rule:owner" 430} 431''' 432 output_file = self.get_config_file_fullname('policy.json') 433 with mock.patch('stevedore.named.NamedExtensionManager', 434 return_value=test_mgr) as mock_ext_mgr: 435 generator._generate_sample(['base_rules', 'rules'], 436 output_file=output_file, 437 output_format='json') 438 mock_ext_mgr.assert_called_once_with( 439 'oslo.policy.policies', names=['base_rules', 'rules'], 440 on_load_failure_callback=generator.on_load_failure_callback, 441 invoke_on_load=True) 442 443 with open(output_file, 'r') as written_file: 444 written_policy = written_file.read() 445 446 self.assertEqual(expected, written_policy) 447 448 def test_expected_content_stdout(self): 449 extensions = [] 450 for name, opts in OPTS.items(): 451 ext = stevedore.extension.Extension(name=name, entry_point=None, 452 plugin=None, obj=opts) 453 extensions.append(ext) 454 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 455 extensions=extensions, namespace=['base_rules', 'rules']) 456 457 expected = '''{ 458 "admin": "is_admin:True", 459 "owner": "project_id:%(project_id)s", 460 "shared": "field:networks:shared=True", 461 "admin_or_owner": "rule:admin or rule:owner" 462} 463''' 464 stdout = self._capture_stdout() 465 with mock.patch('stevedore.named.NamedExtensionManager', 466 return_value=test_mgr) as mock_ext_mgr: 467 generator._generate_sample(['base_rules', 'rules'], 468 output_file=None, 469 output_format='json') 470 mock_ext_mgr.assert_called_once_with( 471 'oslo.policy.policies', names=['base_rules', 'rules'], 472 on_load_failure_callback=generator.on_load_failure_callback, 473 invoke_on_load=True) 474 475 self.assertEqual(expected, stdout.getvalue()) 476 477 @mock.patch.object(generator, 'LOG') 478 def test_generate_json_file_log_warning(self, mock_log): 479 extensions = [] 480 for name, opts in OPTS.items(): 481 ext = stevedore.extension.Extension(name=name, entry_point=None, 482 plugin=None, obj=opts) 483 extensions.append(ext) 484 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 485 extensions=extensions, namespace=['base_rules', 'rules']) 486 487 output_file = self.get_config_file_fullname('policy.json') 488 with mock.patch('stevedore.named.NamedExtensionManager', 489 return_value=test_mgr): 490 generator._generate_sample(['base_rules', 'rules'], output_file, 491 output_format='json') 492 mock_log.warning.assert_any_call(policy.WARN_JSON) 493 494 495class GeneratorRaiseErrorTestCase(testtools.TestCase): 496 def test_generator_raises_error(self): 497 """Verifies that errors from extension manager are not suppressed.""" 498 class FakeException(Exception): 499 pass 500 501 class FakeEP(object): 502 503 def __init__(self): 504 self.name = 'callback_is_expected' 505 self.require = self.resolve 506 self.load = self.resolve 507 508 def resolve(self, *args, **kwargs): 509 raise FakeException() 510 511 fake_ep = FakeEP() 512 with mock.patch('stevedore.named.NamedExtensionManager', 513 side_effect=FakeException()): 514 self.assertRaises(FakeException, generator._generate_sample, 515 fake_ep.name) 516 517 def test_generator_call_with_no_arguments_raises_error(self): 518 testargs = ['oslopolicy-sample-generator'] 519 with mock.patch('sys.argv', testargs): 520 local_conf = cfg.ConfigOpts() 521 self.assertRaises(cfg.RequiredOptError, generator.generate_sample, 522 [], local_conf) 523 524 525class GeneratePolicyTestCase(base.PolicyBaseTestCase): 526 def setUp(self): 527 super(GeneratePolicyTestCase, self).setUp() 528 529 def test_merged_rules(self): 530 extensions = [] 531 for name, opts in OPTS.items(): 532 ext = stevedore.extension.Extension(name=name, entry_point=None, 533 plugin=None, obj=opts) 534 extensions.append(ext) 535 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 536 extensions=extensions, namespace=['base_rules', 'rules']) 537 538 # Write the policy file for an enforcer to load 539 sample_file = self.get_config_file_fullname('policy-sample.yaml') 540 with mock.patch('stevedore.named.NamedExtensionManager', 541 return_value=test_mgr): 542 # generate sample-policy file with only rules 543 generator._generate_sample(['base_rules', 'rules'], sample_file, 544 include_help=False) 545 546 enforcer = policy.Enforcer(self.conf, policy_file='policy-sample.yaml') 547 # register an opt defined in the file 548 enforcer.register_default(policy.RuleDefault('admin', 549 'is_admin:False')) 550 # register a new opt 551 enforcer.register_default(policy.RuleDefault('foo', 'role:foo')) 552 553 # Mock out stevedore to return the configured enforcer 554 ext = stevedore.extension.Extension(name='testing', entry_point=None, 555 plugin=None, obj=enforcer) 556 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 557 extensions=[ext], namespace='testing') 558 559 # Generate a merged file 560 merged_file = self.get_config_file_fullname('policy-merged.yaml') 561 with mock.patch('stevedore.named.NamedExtensionManager', 562 return_value=test_mgr) as mock_ext_mgr: 563 generator._generate_policy(namespace='testing', 564 output_file=merged_file) 565 mock_ext_mgr.assert_called_once_with( 566 'oslo.policy.enforcer', names=['testing'], 567 on_load_failure_callback=generator.on_load_failure_callback, 568 invoke_on_load=True) 569 570 # load the merged file with a new enforcer 571 merged_enforcer = policy.Enforcer(self.conf, 572 policy_file='policy-merged.yaml') 573 merged_enforcer.load_rules() 574 for rule in ['admin', 'owner', 'admin_or_owner', 'foo']: 575 self.assertIn(rule, merged_enforcer.rules) 576 577 self.assertEqual('is_admin:True', str(merged_enforcer.rules['admin'])) 578 self.assertEqual('role:foo', str(merged_enforcer.rules['foo'])) 579 580 581class ListRedundantTestCase(base.PolicyBaseTestCase): 582 def setUp(self): 583 super(ListRedundantTestCase, self).setUp() 584 585 @mock.patch('warnings.warn') 586 def test_matched_rules(self, mock_warn): 587 extensions = [] 588 for name, opts in OPTS.items(): 589 ext = stevedore.extension.Extension(name=name, entry_point=None, 590 plugin=None, obj=opts) 591 extensions.append(ext) 592 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 593 extensions=extensions, namespace=['base_rules', 'rules']) 594 595 # Write the policy file for an enforcer to load 596 sample_file = self.get_config_file_fullname('policy-sample.yaml') 597 with mock.patch('stevedore.named.NamedExtensionManager', 598 return_value=test_mgr): 599 # generate sample-policy file with only rules 600 generator._generate_sample(['base_rules', 'rules'], sample_file, 601 include_help=False) 602 603 enforcer = policy.Enforcer(self.conf, policy_file='policy-sample.yaml') 604 # register opts that match those defined in policy-sample.yaml 605 enforcer.register_default(policy.RuleDefault('admin', 'is_admin:True')) 606 enforcer.register_default( 607 policy.RuleDefault('owner', 'project_id:%(project_id)s')) 608 # register a new opt 609 deprecated_rule = policy.DeprecatedRule( 610 name='old_foo', 611 check_str='role:bar', 612 deprecated_reason='reason', 613 deprecated_since='T' 614 ) 615 enforcer.register_default( 616 policy.RuleDefault( 617 name='foo', 618 check_str='role:foo', 619 deprecated_rule=deprecated_rule, 620 ), 621 ) 622 623 # Mock out stevedore to return the configured enforcer 624 ext = stevedore.extension.Extension(name='testing', entry_point=None, 625 plugin=None, obj=enforcer) 626 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 627 extensions=[ext], namespace='testing') 628 629 stdout = self._capture_stdout() 630 with mock.patch('stevedore.named.NamedExtensionManager', 631 return_value=test_mgr) as mock_ext_mgr: 632 generator._list_redundant(namespace='testing') 633 mock_ext_mgr.assert_called_once_with( 634 'oslo.policy.enforcer', names=['testing'], 635 on_load_failure_callback=generator.on_load_failure_callback, 636 invoke_on_load=True) 637 638 matches = [line.split(': ', 1) for 639 line in stdout.getvalue().splitlines()] 640 matches.sort(key=operator.itemgetter(0)) 641 642 # Should be 'admin' 643 opt0 = matches[0] 644 self.assertEqual('"admin"', opt0[0]) 645 self.assertEqual('"is_admin:True"', opt0[1]) 646 647 # Should be 'owner' 648 opt1 = matches[1] 649 self.assertEqual('"owner"', opt1[0]) 650 self.assertEqual('"project_id:%(project_id)s"', opt1[1]) 651 652 self.assertFalse(mock_warn.called, 653 'Deprecation warnings not suppressed.') 654 655 656class UpgradePolicyTestCase(base.PolicyBaseTestCase): 657 def setUp(self): 658 super(UpgradePolicyTestCase, self).setUp() 659 policy_json_contents = jsonutils.dumps({ 660 "deprecated_name": "rule:admin" 661 }) 662 self.create_config_file('policy.json', policy_json_contents) 663 deprecated_policy = policy.DeprecatedRule( 664 name='deprecated_name', 665 check_str='rule:admin', 666 deprecated_reason='test', 667 deprecated_since='Stein', 668 ) 669 self.new_policy = policy.DocumentedRuleDefault( 670 name='new_policy_name', 671 check_str='rule:admin', 672 description='test_policy', 673 operations=[{'path': '/test', 'method': 'GET'}], 674 deprecated_rule=deprecated_policy, 675 ) 676 self.extensions = [] 677 ext = stevedore.extension.Extension(name='test_upgrade', 678 entry_point=None, 679 plugin=None, 680 obj=[self.new_policy]) 681 self.extensions.append(ext) 682 # Just used for cli opt parsing 683 self.local_conf = cfg.ConfigOpts() 684 685 def test_upgrade_policy_json_file(self): 686 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 687 extensions=self.extensions, namespace='test_upgrade') 688 with mock.patch('stevedore.named.NamedExtensionManager', 689 return_value=test_mgr): 690 testargs = ['olsopolicy-policy-upgrade', 691 '--policy', 692 self.get_config_file_fullname('policy.json'), 693 '--namespace', 'test_upgrade', 694 '--output-file', 695 self.get_config_file_fullname('new_policy.json'), 696 '--format', 'json'] 697 with mock.patch('sys.argv', testargs): 698 generator.upgrade_policy(conf=self.local_conf) 699 new_file = self.get_config_file_fullname('new_policy.json') 700 with open(new_file, 'r') as fh: 701 new_policy = jsonutils.loads(fh.read()) 702 self.assertIsNotNone(new_policy.get('new_policy_name')) 703 self.assertIsNone(new_policy.get('deprecated_name')) 704 705 @mock.patch.object(generator, 'LOG') 706 def test_upgrade_policy_json_file_log_warning(self, mock_log): 707 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 708 extensions=self.extensions, namespace='test_upgrade') 709 with mock.patch('stevedore.named.NamedExtensionManager', 710 return_value=test_mgr): 711 testargs = ['olsopolicy-policy-upgrade', 712 '--policy', 713 self.get_config_file_fullname('policy.json'), 714 '--namespace', 'test_upgrade', 715 '--output-file', 716 self.get_config_file_fullname('new_policy.json'), 717 '--format', 'json'] 718 with mock.patch('sys.argv', testargs): 719 generator.upgrade_policy(conf=self.local_conf) 720 mock_log.warning.assert_any_call(policy.WARN_JSON) 721 722 def test_upgrade_policy_yaml_file(self): 723 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 724 extensions=self.extensions, namespace='test_upgrade') 725 with mock.patch('stevedore.named.NamedExtensionManager', 726 return_value=test_mgr): 727 testargs = ['olsopolicy-policy-upgrade', 728 '--policy', 729 self.get_config_file_fullname('policy.json'), 730 '--namespace', 'test_upgrade', 731 '--output-file', 732 self.get_config_file_fullname('new_policy.yaml'), 733 '--format', 'yaml'] 734 with mock.patch('sys.argv', testargs): 735 generator.upgrade_policy(conf=self.local_conf) 736 new_file = self.get_config_file_fullname('new_policy.yaml') 737 with open(new_file, 'r') as fh: 738 new_policy = yaml.safe_load(fh) 739 self.assertIsNotNone(new_policy.get('new_policy_name')) 740 self.assertIsNone(new_policy.get('deprecated_name')) 741 742 def test_upgrade_policy_json_stdout(self): 743 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 744 extensions=self.extensions, namespace='test_upgrade') 745 stdout = self._capture_stdout() 746 with mock.patch('stevedore.named.NamedExtensionManager', 747 return_value=test_mgr): 748 testargs = ['olsopolicy-policy-upgrade', 749 '--policy', 750 self.get_config_file_fullname('policy.json'), 751 '--namespace', 'test_upgrade', 752 '--format', 'json'] 753 with mock.patch('sys.argv', testargs): 754 generator.upgrade_policy(conf=self.local_conf) 755 expected = '''{ 756 "new_policy_name": "rule:admin" 757}''' 758 self.assertEqual(expected, stdout.getvalue()) 759 760 def test_upgrade_policy_yaml_stdout(self): 761 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 762 extensions=self.extensions, namespace='test_upgrade') 763 stdout = self._capture_stdout() 764 with mock.patch('stevedore.named.NamedExtensionManager', 765 return_value=test_mgr): 766 testargs = ['olsopolicy-policy-upgrade', 767 '--policy', 768 self.get_config_file_fullname('policy.json'), 769 '--namespace', 'test_upgrade', 770 '--format', 'yaml'] 771 with mock.patch('sys.argv', testargs): 772 generator.upgrade_policy(conf=self.local_conf) 773 expected = '''new_policy_name: rule:admin 774''' 775 self.assertEqual(expected, stdout.getvalue()) 776 777 778@mock.patch('stevedore.named.NamedExtensionManager') 779class GetEnforcerTestCase(base.PolicyBaseTestCase): 780 def test_get_enforcer(self, mock_manager): 781 mock_instance = mock.MagicMock() 782 mock_instance.__contains__.return_value = True 783 mock_manager.return_value = mock_instance 784 mock_item = mock.Mock() 785 mock_item.obj = 'test' 786 mock_instance.__getitem__.return_value = mock_item 787 self.assertEqual('test', generator._get_enforcer('foo')) 788 789 def test_get_enforcer_missing(self, mock_manager): 790 mock_instance = mock.MagicMock() 791 mock_instance.__contains__.return_value = False 792 mock_manager.return_value = mock_instance 793 self.assertRaises(KeyError, generator._get_enforcer, 'nonexistent') 794 795 796class ValidatorTestCase(base.PolicyBaseTestCase): 797 def _get_test_enforcer(self): 798 test_rules = [policy.RuleDefault('foo', 'foo:bar=baz'), 799 policy.RuleDefault('bar', 'bar:foo=baz')] 800 enforcer = policy.Enforcer(self.conf) 801 enforcer.register_defaults(test_rules) 802 return enforcer 803 804 def _test_policy(self, rule, success=False, missing_file=False): 805 policy_file = self.get_config_file_fullname('test.yaml') 806 if missing_file: 807 policy_file = 'bogus.yaml' 808 self.create_config_file('test.yaml', rule) 809 self.create_config_file('test.conf', 810 '[oslo_policy]\npolicy_file=%s' % policy_file) 811 # Reparse now that we've created our configs 812 self.conf(args=['--config-dir', self.config_dir]) 813 814 with mock.patch('oslo_policy.generator._get_enforcer') as ge: 815 ge.return_value = self._get_test_enforcer() 816 result = generator._validate_policy('test') 817 if success: 818 self.assertEqual(0, result) 819 else: 820 self.assertEqual(1, result) 821 822 def test_success(self): 823 self._test_policy('foo: rule:bar', success=True) 824 825 def test_cyclical_reference(self): 826 self._test_policy('foo: rule:bar\nbar: rule:foo') 827 828 def test_invalid_syntax(self): 829 self._test_policy('foo: (bar))') 830 831 def test_false_okay(self): 832 self._test_policy('foo: !', success=True) 833 834 def test_reference_nonexistent(self): 835 self._test_policy('foo: rule:baz') 836 837 def test_nonexistent(self): 838 self._test_policy('baz: rule:foo') 839 840 def test_missing_policy_file(self): 841 self._test_policy('', missing_file=True) 842 843 844class ConvertJsonToYamlTestCase(base.PolicyBaseTestCase): 845 def setUp(self): 846 super(ConvertJsonToYamlTestCase, self).setUp() 847 policy_json_contents = jsonutils.dumps({ 848 "rule1_name": "rule:admin", 849 "rule2_name": "rule:overridden", 850 "deprecated_rule1_name": "rule:admin" 851 }) 852 self.create_config_file('policy.json', policy_json_contents) 853 self.output_file_path = self.get_config_file_fullname( 854 'converted_policy.yaml') 855 deprecated_policy = policy.DeprecatedRule( 856 name='deprecated_rule1_name', 857 check_str='rule:admin', 858 deprecated_reason='testing', 859 deprecated_since='ussuri', 860 ) 861 self.registered_policy = [ 862 policy.DocumentedRuleDefault( 863 name='rule1_name', 864 check_str='rule:admin', 865 description='test_rule1', 866 operations=[{'path': '/test', 'method': 'GET'}], 867 deprecated_rule=deprecated_policy, 868 scope_types=['system'], 869 ), 870 policy.RuleDefault( 871 name='rule2_name', 872 check_str='rule:admin', 873 ) 874 ] 875 self.extensions = [] 876 ext = stevedore.extension.Extension(name='test', 877 entry_point=None, 878 plugin=None, 879 obj=self.registered_policy) 880 self.extensions.append(ext) 881 # Just used for cli opt parsing 882 self.local_conf = cfg.ConfigOpts() 883 884 self.expected = '''# test_rule1 885# GET /test 886# Intended scope(s): system 887#"rule1_name": "rule:admin" 888 889# rule2_name 890"rule2_name": "rule:overridden" 891 892# WARNING: Below rules are either deprecated rules 893# or extra rules in policy file, it is strongly 894# recommended to switch to new rules. 895"deprecated_rule1_name": "rule:admin" 896''' 897 898 def _is_yaml(self, data): 899 is_yaml = False 900 try: 901 jsonutils.loads(data) 902 except ValueError: 903 try: 904 yaml.safe_load(data) 905 is_yaml = True 906 except yaml.scanner.ScannerError: 907 pass 908 return is_yaml 909 910 def _test_convert_json_to_yaml_file(self, output_to_file=True): 911 test_mgr = stevedore.named.NamedExtensionManager.make_test_instance( 912 extensions=self.extensions, namespace='test') 913 converted_policy_data = None 914 with mock.patch('stevedore.named.NamedExtensionManager', 915 return_value=test_mgr): 916 testargs = ['oslopolicy-convert-json-to-yaml', 917 '--namespace', 'test', 918 '--policy-file', 919 self.get_config_file_fullname('policy.json')] 920 if output_to_file: 921 testargs.extend(['--output-file', 922 self.output_file_path]) 923 with mock.patch('sys.argv', testargs): 924 generator.convert_policy_json_to_yaml(conf=self.local_conf) 925 if output_to_file: 926 with open(self.output_file_path, 'r') as fh: 927 converted_policy_data = fh.read() 928 return converted_policy_data 929 930 def test_convert_json_to_yaml_file(self): 931 converted_policy_data = self._test_convert_json_to_yaml_file() 932 self.assertTrue(self._is_yaml(converted_policy_data)) 933 self.assertEqual(self.expected, converted_policy_data) 934 935 def test_convert_policy_to_stdout(self): 936 stdout = self._capture_stdout() 937 self._test_convert_json_to_yaml_file(output_to_file=False) 938 self.assertEqual(self.expected, stdout.getvalue()) 939 940 def test_converted_yaml_is_loadable(self): 941 self._test_convert_json_to_yaml_file() 942 enforcer = policy.Enforcer(self.conf, 943 policy_file=self.output_file_path) 944 enforcer.load_rules() 945 for rule in ['rule2_name', 'deprecated_rule1_name']: 946 self.assertIn(rule, enforcer.rules) 947 948 def test_default_rules_comment_out_in_yaml_file(self): 949 converted_policy_data = self._test_convert_json_to_yaml_file() 950 commented_default_rule = '''# test_rule1 951# GET /test 952# Intended scope(s): system 953#"rule1_name": "rule:admin" 954 955''' 956 self.assertIn(commented_default_rule, converted_policy_data) 957 958 def test_overridden_rules_uncommented_in_yaml_file(self): 959 converted_policy_data = self._test_convert_json_to_yaml_file() 960 uncommented_overridden_rule = '''# rule2_name 961"rule2_name": "rule:overridden" 962 963''' 964 self.assertIn(uncommented_overridden_rule, converted_policy_data) 965 966 def test_existing_deprecated_rules_kept_uncommented_in_yaml_file(self): 967 converted_policy_data = self._test_convert_json_to_yaml_file() 968 existing_deprecated_rule_with_warning = '''# WARNING: Below rules are either deprecated rules 969# or extra rules in policy file, it is strongly 970# recommended to switch to new rules. 971"deprecated_rule1_name": "rule:admin" 972''' 973 self.assertIn(existing_deprecated_rule_with_warning, 974 converted_policy_data) 975