1# Copyright 2011 OpenStack Foundation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15import configparser 16import logging 17import logging.handlers 18import os 19import tempfile 20from unittest import mock 21import uuid 22 23import fixtures 24import testtools 25 26from oslo_rootwrap import cmd 27from oslo_rootwrap import daemon 28from oslo_rootwrap import filters 29from oslo_rootwrap import subprocess 30from oslo_rootwrap import wrapper 31 32 33class RootwrapLoaderTestCase(testtools.TestCase): 34 35 def test_privsep_in_loader(self): 36 privsep = ["privsep-helper", "--context", "foo"] 37 filterlist = wrapper.load_filters([]) 38 39 # mock out get_exec because 40 with mock.patch.object(filters.CommandFilter, 'get_exec') as ge: 41 ge.return_value = "/fake/privsep-helper" 42 filtermatch = wrapper.match_filter(filterlist, privsep) 43 44 self.assertIsNotNone(filtermatch) 45 self.assertEqual(["/fake/privsep-helper", "--context", "foo"], 46 filtermatch.get_command(privsep)) 47 48 def test_strict_switched_off_in_configparser(self): 49 temp_dir = self.useFixture(fixtures.TempDir()).path 50 os.mkdir(os.path.join(temp_dir, 'nested')) 51 temp_file = os.path.join(temp_dir, 'test.conf') 52 f = open(temp_file, 'w') 53 f.write("""[Filters] 54privsep: PathFilter, privsep-helper, root 55privsep: PathFilter, privsep-helper, root 56""") 57 f.close() 58 filterlist = wrapper.load_filters([temp_dir]) 59 self.assertIsNotNone(filterlist) 60 61 62class RootwrapTestCase(testtools.TestCase): 63 if os.path.exists('/sbin/ip'): 64 _ip = '/sbin/ip' 65 else: 66 _ip = '/bin/ip' 67 68 def setUp(self): 69 super(RootwrapTestCase, self).setUp() 70 self.filters = [ 71 filters.RegExpFilter("/bin/ls", "root", 'ls', '/[a-z]+'), 72 filters.CommandFilter("/usr/bin/foo_bar_not_exist", "root"), 73 filters.RegExpFilter("/bin/cat", "root", 'cat', '/[a-z]+'), 74 filters.CommandFilter("/nonexistent/cat", "root"), 75 filters.CommandFilter("/bin/cat", "root") # Keep this one last 76 ] 77 78 def test_CommandFilter(self): 79 f = filters.CommandFilter("sleep", 'root', '10') 80 self.assertFalse(f.match(["sleep2"])) 81 82 # verify that any arguments are accepted 83 self.assertTrue(f.match(["sleep"])) 84 self.assertTrue(f.match(["sleep", "anything"])) 85 self.assertTrue(f.match(["sleep", "10"])) 86 f = filters.CommandFilter("sleep", 'root') 87 self.assertTrue(f.match(["sleep", "10"])) 88 89 def test_empty_commandfilter(self): 90 f = filters.CommandFilter("sleep", "root") 91 self.assertFalse(f.match([])) 92 self.assertFalse(f.match(None)) 93 94 def test_empty_regexpfilter(self): 95 f = filters.RegExpFilter("sleep", "root", "sleep") 96 self.assertFalse(f.match([])) 97 self.assertFalse(f.match(None)) 98 99 def test_empty_invalid_regexpfilter(self): 100 f = filters.RegExpFilter("sleep", "root") 101 self.assertFalse(f.match(["anything"])) 102 self.assertFalse(f.match([])) 103 104 def test_RegExpFilter_match(self): 105 usercmd = ["ls", "/root"] 106 filtermatch = wrapper.match_filter(self.filters, usercmd) 107 self.assertFalse(filtermatch is None) 108 self.assertEqual(["/bin/ls", "/root"], 109 filtermatch.get_command(usercmd)) 110 111 def test_RegExpFilter_reject(self): 112 usercmd = ["ls", "root"] 113 self.assertRaises(wrapper.NoFilterMatched, 114 wrapper.match_filter, self.filters, usercmd) 115 116 def test_missing_command(self): 117 valid_but_missing = ["foo_bar_not_exist"] 118 invalid = ["foo_bar_not_exist_and_not_matched"] 119 self.assertRaises(wrapper.FilterMatchNotExecutable, 120 wrapper.match_filter, 121 self.filters, valid_but_missing) 122 self.assertRaises(wrapper.NoFilterMatched, 123 wrapper.match_filter, self.filters, invalid) 124 125 def _test_EnvFilter_as_DnsMasq(self, config_file_arg): 126 usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar', 127 'dnsmasq', 'foo'] 128 f = filters.EnvFilter("env", "root", config_file_arg + '=A', 129 'NETWORK_ID=', "/usr/bin/dnsmasq") 130 self.assertTrue(f.match(usercmd)) 131 self.assertEqual(['/usr/bin/dnsmasq', 'foo'], f.get_command(usercmd)) 132 env = f.get_environment(usercmd) 133 self.assertEqual('A', env.get(config_file_arg)) 134 self.assertEqual('foobar', env.get('NETWORK_ID')) 135 136 def test_EnvFilter(self): 137 envset = ['A=/some/thing', 'B=somethingelse'] 138 envcmd = ['env'] + envset 139 realcmd = ['sleep', '10'] 140 usercmd = envcmd + realcmd 141 142 f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep") 143 # accept with leading env 144 self.assertTrue(f.match(envcmd + ["sleep"])) 145 # accept without leading env 146 self.assertTrue(f.match(envset + ["sleep"])) 147 148 # any other command does not match 149 self.assertFalse(f.match(envcmd + ["sleep2"])) 150 self.assertFalse(f.match(envset + ["sleep2"])) 151 152 # accept any trailing arguments 153 self.assertTrue(f.match(usercmd)) 154 155 # require given environment variables to match 156 self.assertFalse(f.match([envcmd, 'C=ELSE'])) 157 self.assertFalse(f.match(['env', 'C=xx'])) 158 self.assertFalse(f.match(['env', 'A=xx'])) 159 160 # require env command to be given 161 # (otherwise CommandFilters should match 162 self.assertFalse(f.match(realcmd)) 163 # require command to match 164 self.assertFalse(f.match(envcmd)) 165 self.assertFalse(f.match(envcmd[1:])) 166 167 # ensure that the env command is stripped when executing 168 self.assertEqual(realcmd, f.exec_args(usercmd)) 169 env = f.get_environment(usercmd) 170 # check that environment variables are set 171 self.assertEqual('/some/thing', env.get('A')) 172 self.assertEqual('somethingelse', env.get('B')) 173 self.assertNotIn('sleep', env.keys()) 174 175 def test_EnvFilter_without_leading_env(self): 176 envset = ['A=/some/thing', 'B=somethingelse'] 177 envcmd = ['env'] + envset 178 realcmd = ['sleep', '10'] 179 180 f = filters.EnvFilter("sleep", "root", "A=", "B=ignored") 181 182 # accept without leading env 183 self.assertTrue(f.match(envset + ["sleep"])) 184 185 self.assertEqual(realcmd, f.get_command(envcmd + realcmd)) 186 self.assertEqual(realcmd, f.get_command(envset + realcmd)) 187 188 env = f.get_environment(envset + realcmd) 189 # check that environment variables are set 190 self.assertEqual('/some/thing', env.get('A')) 191 self.assertEqual('somethingelse', env.get('B')) 192 self.assertNotIn('sleep', env.keys()) 193 194 def test_KillFilter(self): 195 if not os.path.exists("/proc/%d" % os.getpid()): 196 self.skipTest("Test requires /proc filesystem (procfs)") 197 p = subprocess.Popen(["cat"], stdin=subprocess.PIPE, 198 stdout=subprocess.PIPE, 199 stderr=subprocess.STDOUT) 200 try: 201 f = filters.KillFilter("root", "/bin/cat", "-9", "-HUP") 202 f2 = filters.KillFilter("root", "/usr/bin/cat", "-9", "-HUP") 203 usercmd = ['kill', '-ALRM', p.pid] 204 # Incorrect signal should fail 205 self.assertFalse(f.match(usercmd) or f2.match(usercmd)) 206 usercmd = ['kill', p.pid] 207 # Providing no signal should fail 208 self.assertFalse(f.match(usercmd) or f2.match(usercmd)) 209 # Providing matching signal should be allowed 210 usercmd = ['kill', '-9', p.pid] 211 self.assertTrue(f.match(usercmd) or f2.match(usercmd)) 212 213 f = filters.KillFilter("root", "/bin/cat") 214 f2 = filters.KillFilter("root", "/usr/bin/cat") 215 usercmd = ['kill', os.getpid()] 216 # Our own PID does not match /bin/sleep, so it should fail 217 self.assertFalse(f.match(usercmd) or f2.match(usercmd)) 218 usercmd = ['kill', 999999] 219 # Nonexistent PID should fail 220 self.assertFalse(f.match(usercmd) or f2.match(usercmd)) 221 usercmd = ['kill', p.pid] 222 # Providing no signal should work 223 self.assertTrue(f.match(usercmd) or f2.match(usercmd)) 224 225 # verify that relative paths are matched against $PATH 226 f = filters.KillFilter("root", "cat") 227 # Our own PID does not match so it should fail 228 usercmd = ['kill', os.getpid()] 229 self.assertFalse(f.match(usercmd)) 230 # Filter should find cat in /bin or /usr/bin 231 usercmd = ['kill', p.pid] 232 self.assertTrue(f.match(usercmd)) 233 # Filter shouldn't be able to find binary in $PATH, so fail 234 with fixtures.EnvironmentVariable("PATH", "/foo:/bar"): 235 self.assertFalse(f.match(usercmd)) 236 # ensure that unset $PATH is not causing an exception 237 with fixtures.EnvironmentVariable("PATH"): 238 self.assertFalse(f.match(usercmd)) 239 finally: 240 # Terminate the "cat" process and wait for it to finish 241 p.terminate() 242 p.wait() 243 244 def test_KillFilter_no_raise(self): 245 """Makes sure ValueError from bug 926412 is gone.""" 246 f = filters.KillFilter("root", "") 247 # Providing anything other than kill should be False 248 usercmd = ['notkill', 999999] 249 self.assertFalse(f.match(usercmd)) 250 # Providing something that is not a pid should be False 251 usercmd = ['kill', 'notapid'] 252 self.assertFalse(f.match(usercmd)) 253 # no arguments should also be fine 254 self.assertFalse(f.match([])) 255 self.assertFalse(f.match(None)) 256 257 def test_KillFilter_deleted_exe(self): 258 """Makes sure deleted exe's are killed correctly.""" 259 command = "/bin/commandddddd" 260 f = filters.KillFilter("root", command) 261 usercmd = ['kill', 1234] 262 # Providing no signal should work 263 with mock.patch('os.readlink') as readlink: 264 readlink.return_value = command + ' (deleted)' 265 with mock.patch('os.path.isfile') as exists: 266 def fake_exists(path): 267 return path == command 268 exists.side_effect = fake_exists 269 self.assertTrue(f.match(usercmd)) 270 271 @mock.patch('os.readlink') 272 @mock.patch('os.path.isfile') 273 def test_KillFilter_upgraded_exe(self, mock_isfile, mock_readlink): 274 """Makes sure upgraded exe's are killed correctly.""" 275 f = filters.KillFilter("root", "/bin/commandddddd") 276 command = "/bin/commandddddd" 277 usercmd = ['kill', 1234] 278 279 def fake_exists(path): 280 return path == command 281 282 mock_readlink.return_value = command + '\0\05190bfb2 (deleted)' 283 mock_isfile.side_effect = fake_exists 284 self.assertTrue(f.match(usercmd)) 285 286 @mock.patch('os.readlink') 287 @mock.patch('os.path.isfile') 288 @mock.patch('os.path.exists') 289 @mock.patch('os.access') 290 def test_KillFilter_renamed_exe(self, mock_access, mock_exists, 291 mock_isfile, mock_readlink): 292 """Makes sure renamed exe's are killed correctly.""" 293 command = "/bin/commandddddd" 294 f = filters.KillFilter("root", command) 295 usercmd = ['kill', 1234] 296 297 def fake_os_func(path, *args): 298 return path == command 299 300 mock_readlink.return_value = command + ';90bfb2 (deleted)' 301 m = mock.mock_open(read_data=command) 302 with mock.patch("builtins.open", m, create=True): 303 mock_isfile.side_effect = fake_os_func 304 mock_exists.side_effect = fake_os_func 305 mock_access.side_effect = fake_os_func 306 self.assertTrue(f.match(usercmd)) 307 308 def test_ReadFileFilter(self): 309 goodfn = '/good/file.name' 310 f = filters.ReadFileFilter(goodfn) 311 usercmd = ['cat', '/bad/file'] 312 self.assertFalse(f.match(['cat', '/bad/file'])) 313 usercmd = ['cat', goodfn] 314 self.assertEqual(['/bin/cat', goodfn], f.get_command(usercmd)) 315 self.assertTrue(f.match(usercmd)) 316 317 def test_IpFilter_non_netns(self): 318 f = filters.IpFilter(self._ip, 'root') 319 self.assertTrue(f.match(['ip', 'link', 'list'])) 320 self.assertTrue(f.match(['ip', '-s', 'link', 'list'])) 321 self.assertTrue(f.match(['ip', '-s', '-v', 'netns', 'add'])) 322 self.assertTrue(f.match(['ip', 'link', 'set', 'interface', 323 'netns', 'somens'])) 324 325 def test_IpFilter_netns(self): 326 f = filters.IpFilter(self._ip, 'root') 327 self.assertFalse(f.match(['ip', 'netns', 'exec', 'foo'])) 328 self.assertFalse(f.match(['ip', 'netns', 'exec'])) 329 self.assertFalse(f.match(['ip', '-s', 'netns', 'exec'])) 330 self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec'])) 331 self.assertFalse(f.match(['ip', 'net', 'exec', 'foo'])) 332 self.assertFalse(f.match(['ip', 'netns', 'e', 'foo'])) 333 334 def _test_IpFilter_netns_helper(self, action): 335 f = filters.IpFilter(self._ip, 'root') 336 self.assertTrue(f.match(['ip', 'link', action])) 337 338 def test_IpFilter_netns_add(self): 339 self._test_IpFilter_netns_helper('add') 340 341 def test_IpFilter_netns_delete(self): 342 self._test_IpFilter_netns_helper('delete') 343 344 def test_IpFilter_netns_list(self): 345 self._test_IpFilter_netns_helper('list') 346 347 def test_IpNetnsExecFilter_match(self): 348 f = filters.IpNetnsExecFilter(self._ip, 'root') 349 self.assertTrue( 350 f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'])) 351 self.assertTrue(f.match(['ip', 'net', 'exec', 'foo', 'bar'])) 352 self.assertTrue(f.match(['ip', 'netn', 'e', 'foo', 'bar'])) 353 self.assertTrue(f.match(['ip', 'net', 'e', 'foo', 'bar'])) 354 self.assertTrue(f.match(['ip', 'net', 'exe', 'foo', 'bar'])) 355 356 def test_IpNetnsExecFilter_nomatch(self): 357 f = filters.IpNetnsExecFilter(self._ip, 'root') 358 self.assertFalse(f.match(['ip', 'link', 'list'])) 359 self.assertFalse(f.match(['ip', 'foo', 'bar', 'netns'])) 360 self.assertFalse(f.match(['ip', '-s', 'netns', 'exec'])) 361 self.assertFalse(f.match(['ip', '-l', '42', 'netns', 'exec'])) 362 self.assertFalse(f.match(['ip', 'netns exec', 'foo', 'bar', 'baz'])) 363 self.assertFalse(f.match([])) 364 365 # verify that at least a NS is given 366 self.assertFalse(f.match(['ip', 'netns', 'exec'])) 367 368 def test_IpNetnsExecFilter_nomatch_nonroot(self): 369 f = filters.IpNetnsExecFilter(self._ip, 'user') 370 self.assertFalse( 371 f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'])) 372 373 def test_match_filter_recurses_exec_command_filter_matches(self): 374 filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'), 375 filters.IpFilter(self._ip, 'root')] 376 args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'] 377 378 self.assertIsNotNone(wrapper.match_filter(filter_list, args)) 379 380 def test_match_filter_recurses_exec_command_matches_user(self): 381 filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'), 382 filters.IpFilter(self._ip, 'user')] 383 args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'] 384 385 # Currently ip netns exec requires root, so verify that 386 # no non-root filter is matched, as that would escalate privileges 387 self.assertRaises(wrapper.NoFilterMatched, 388 wrapper.match_filter, filter_list, args) 389 390 def test_match_filter_recurses_exec_command_filter_does_not_match(self): 391 filter_list = [filters.IpNetnsExecFilter(self._ip, 'root'), 392 filters.IpFilter(self._ip, 'root')] 393 args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar', 394 'ip', 'link', 'list'] 395 396 self.assertRaises(wrapper.NoFilterMatched, 397 wrapper.match_filter, filter_list, args) 398 399 def test_ChainingRegExpFilter_match(self): 400 filter_list = [filters.ChainingRegExpFilter('nice', 'root', 401 'nice', r'-?\d+'), 402 filters.CommandFilter('cat', 'root')] 403 args = ['nice', '5', 'cat', '/a'] 404 dirs = ['/bin', '/usr/bin'] 405 406 self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs)) 407 408 def test_ChainingRegExpFilter_not_match(self): 409 filter_list = [filters.ChainingRegExpFilter('nice', 'root', 410 'nice', r'-?\d+'), 411 filters.CommandFilter('cat', 'root')] 412 args_invalid = (['nice', '5', 'ls', '/a'], 413 ['nice', '--5', 'cat', '/a'], 414 ['nice2', '5', 'cat', '/a'], 415 ['nice', 'cat', '/a'], 416 ['nice', '5']) 417 dirs = ['/bin', '/usr/bin'] 418 419 for args in args_invalid: 420 self.assertRaises(wrapper.NoFilterMatched, 421 wrapper.match_filter, filter_list, args, dirs) 422 423 def test_ChainingRegExpFilter_multiple(self): 424 filter_list = [filters.ChainingRegExpFilter('ionice', 'root', 'ionice', 425 '-c[0-3]'), 426 filters.ChainingRegExpFilter('ionice', 'root', 'ionice', 427 '-c[0-3]', '-n[0-7]'), 428 filters.CommandFilter('cat', 'root')] 429 # both filters match to ['ionice', '-c2'], but only the second accepts 430 args = ['ionice', '-c2', '-n7', 'cat', '/a'] 431 dirs = ['/bin', '/usr/bin'] 432 433 self.assertIsNotNone(wrapper.match_filter(filter_list, args, dirs)) 434 435 def test_ReadFileFilter_empty_args(self): 436 goodfn = '/good/file.name' 437 f = filters.ReadFileFilter(goodfn) 438 self.assertFalse(f.match([])) 439 self.assertFalse(f.match(None)) 440 441 def test_exec_dirs_search(self): 442 # This test supposes you have /bin/cat or /usr/bin/cat locally 443 f = filters.CommandFilter("cat", "root") 444 usercmd = ['cat', '/f'] 445 self.assertTrue(f.match(usercmd)) 446 self.assertTrue(f.get_command(usercmd, 447 exec_dirs=['/bin', '/usr/bin']) 448 in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f'])) 449 450 def test_skips(self): 451 # Check that all filters are skipped and that the last matches 452 usercmd = ["cat", "/"] 453 filtermatch = wrapper.match_filter(self.filters, usercmd) 454 self.assertTrue(filtermatch is self.filters[-1]) 455 456 def test_RootwrapConfig(self): 457 raw = configparser.RawConfigParser() 458 459 # Empty config should raise configparser.Error 460 self.assertRaises(configparser.Error, 461 wrapper.RootwrapConfig, raw) 462 463 # Check default values 464 raw.set('DEFAULT', 'filters_path', '/a,/b') 465 config = wrapper.RootwrapConfig(raw) 466 self.assertEqual(['/a', '/b'], config.filters_path) 467 self.assertEqual(os.environ["PATH"].split(':'), config.exec_dirs) 468 469 with fixtures.EnvironmentVariable("PATH"): 470 c = wrapper.RootwrapConfig(raw) 471 self.assertEqual([], c.exec_dirs) 472 473 self.assertFalse(config.use_syslog) 474 self.assertEqual(logging.handlers.SysLogHandler.LOG_SYSLOG, 475 config.syslog_log_facility) 476 self.assertEqual(logging.ERROR, config.syslog_log_level) 477 478 # Check general values 479 raw.set('DEFAULT', 'exec_dirs', '/a,/x') 480 config = wrapper.RootwrapConfig(raw) 481 self.assertEqual(['/a', '/x'], config.exec_dirs) 482 483 raw.set('DEFAULT', 'use_syslog', 'oui') 484 self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) 485 raw.set('DEFAULT', 'use_syslog', 'true') 486 config = wrapper.RootwrapConfig(raw) 487 self.assertTrue(config.use_syslog) 488 489 raw.set('DEFAULT', 'syslog_log_facility', 'moo') 490 self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) 491 raw.set('DEFAULT', 'syslog_log_facility', 'local0') 492 config = wrapper.RootwrapConfig(raw) 493 self.assertEqual(logging.handlers.SysLogHandler.LOG_LOCAL0, 494 config.syslog_log_facility) 495 raw.set('DEFAULT', 'syslog_log_facility', 'LOG_AUTH') 496 config = wrapper.RootwrapConfig(raw) 497 self.assertEqual(logging.handlers.SysLogHandler.LOG_AUTH, 498 config.syslog_log_facility) 499 500 raw.set('DEFAULT', 'syslog_log_level', 'bar') 501 self.assertRaises(ValueError, wrapper.RootwrapConfig, raw) 502 raw.set('DEFAULT', 'syslog_log_level', 'INFO') 503 config = wrapper.RootwrapConfig(raw) 504 self.assertEqual(logging.INFO, config.syslog_log_level) 505 506 def test_getlogin(self): 507 with mock.patch('os.getlogin') as os_getlogin: 508 os_getlogin.return_value = 'foo' 509 self.assertEqual('foo', wrapper._getlogin()) 510 511 def test_getlogin_bad(self): 512 with mock.patch('os.getenv') as os_getenv: 513 with mock.patch('os.getlogin') as os_getlogin: 514 os_getenv.side_effect = [None, None, 'bar'] 515 os_getlogin.side_effect = OSError( 516 '[Errno 22] Invalid argument') 517 self.assertEqual('bar', wrapper._getlogin()) 518 os_getlogin.assert_called_once_with() 519 self.assertEqual(3, os_getenv.call_count) 520 521 522class PathFilterTestCase(testtools.TestCase): 523 def setUp(self): 524 super(PathFilterTestCase, self).setUp() 525 526 self.tmp_root_dir = tempfile.mkdtemp() 527 tmpdir = fixtures.TempDir(self.tmp_root_dir) 528 self.useFixture(tmpdir) 529 530 self.f = filters.PathFilter('/bin/chown', 'root', 'nova', tmpdir.path) 531 532 gen_name = lambda: str(uuid.uuid4()) 533 534 self.SIMPLE_FILE_WITHIN_DIR = os.path.join(tmpdir.path, 'some') 535 self.SIMPLE_FILE_OUTSIDE_DIR = os.path.join(self.tmp_root_dir, 'some') 536 self.TRAVERSAL_WITHIN_DIR = os.path.join(tmpdir.path, 'a', '..', 537 'some') 538 self.TRAVERSAL_OUTSIDE_DIR = os.path.join(tmpdir.path, '..', 'some') 539 540 self.TRAVERSAL_SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, 541 gen_name()) 542 os.symlink(os.path.join(tmpdir.path, 'a', '..', 'a'), 543 self.TRAVERSAL_SYMLINK_WITHIN_DIR) 544 545 self.TRAVERSAL_SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, 546 gen_name()) 547 os.symlink(os.path.join(tmpdir.path, 'a', '..', '..', '..', 'etc'), 548 self.TRAVERSAL_SYMLINK_OUTSIDE_DIR) 549 550 self.SYMLINK_WITHIN_DIR = os.path.join(tmpdir.path, gen_name()) 551 os.symlink(os.path.join(tmpdir.path, 'a'), self.SYMLINK_WITHIN_DIR) 552 553 self.SYMLINK_OUTSIDE_DIR = os.path.join(tmpdir.path, gen_name()) 554 os.symlink(os.path.join(self.tmp_root_dir, 'some_file'), 555 self.SYMLINK_OUTSIDE_DIR) 556 557 def test_empty_args(self): 558 self.assertFalse(self.f.match([])) 559 self.assertFalse(self.f.match(None)) 560 561 def test_argument_pass_constraint(self): 562 f = filters.PathFilter('/bin/chown', 'root', 'pass', 'pass') 563 564 args = ['chown', 'something', self.SIMPLE_FILE_OUTSIDE_DIR] 565 self.assertTrue(f.match(args)) 566 567 def test_argument_equality_constraint(self): 568 temp_file_path = os.path.join(self.tmp_root_dir, 'spam/eggs') 569 f = filters.PathFilter('/bin/chown', 'root', 'nova', temp_file_path) 570 571 args = ['chown', 'nova', temp_file_path] 572 self.assertTrue(f.match(args)) 573 574 args = ['chown', 'quantum', temp_file_path] 575 self.assertFalse(f.match(args)) 576 577 def test_wrong_arguments_number(self): 578 args = ['chown', '-c', 'nova', self.SIMPLE_FILE_WITHIN_DIR] 579 self.assertFalse(self.f.match(args)) 580 581 def test_wrong_exec_command(self): 582 args = ['wrong_exec', self.SIMPLE_FILE_WITHIN_DIR] 583 self.assertFalse(self.f.match(args)) 584 585 def test_match(self): 586 args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR] 587 self.assertTrue(self.f.match(args)) 588 589 def test_match_traversal(self): 590 args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR] 591 self.assertTrue(self.f.match(args)) 592 593 def test_match_symlink(self): 594 args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR] 595 self.assertTrue(self.f.match(args)) 596 597 def test_match_traversal_symlink(self): 598 args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR] 599 self.assertTrue(self.f.match(args)) 600 601 def test_reject(self): 602 args = ['chown', 'nova', self.SIMPLE_FILE_OUTSIDE_DIR] 603 self.assertFalse(self.f.match(args)) 604 605 def test_reject_traversal(self): 606 args = ['chown', 'nova', self.TRAVERSAL_OUTSIDE_DIR] 607 self.assertFalse(self.f.match(args)) 608 609 def test_reject_symlink(self): 610 args = ['chown', 'nova', self.SYMLINK_OUTSIDE_DIR] 611 self.assertFalse(self.f.match(args)) 612 613 def test_reject_traversal_symlink(self): 614 args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_OUTSIDE_DIR] 615 self.assertFalse(self.f.match(args)) 616 617 def test_get_command(self): 618 args = ['chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR] 619 expected = ['/bin/chown', 'nova', self.SIMPLE_FILE_WITHIN_DIR] 620 621 self.assertEqual(expected, self.f.get_command(args)) 622 623 def test_get_command_traversal(self): 624 args = ['chown', 'nova', self.TRAVERSAL_WITHIN_DIR] 625 expected = ['/bin/chown', 'nova', 626 os.path.realpath(self.TRAVERSAL_WITHIN_DIR)] 627 628 self.assertEqual(expected, self.f.get_command(args)) 629 630 def test_get_command_symlink(self): 631 args = ['chown', 'nova', self.SYMLINK_WITHIN_DIR] 632 expected = ['/bin/chown', 'nova', 633 os.path.realpath(self.SYMLINK_WITHIN_DIR)] 634 635 self.assertEqual(expected, self.f.get_command(args)) 636 637 def test_get_command_traversal_symlink(self): 638 args = ['chown', 'nova', self.TRAVERSAL_SYMLINK_WITHIN_DIR] 639 expected = ['/bin/chown', 'nova', 640 os.path.realpath(self.TRAVERSAL_SYMLINK_WITHIN_DIR)] 641 642 self.assertEqual(expected, self.f.get_command(args)) 643 644 645class RunOneCommandTestCase(testtools.TestCase): 646 def _test_returncode_helper(self, returncode, expected): 647 with mock.patch.object(wrapper, 'start_subprocess') as mock_start: 648 with mock.patch('sys.exit') as mock_exit: 649 mock_start.return_value.wait.return_value = returncode 650 cmd.run_one_command(None, mock.Mock(), None, None) 651 mock_exit.assert_called_once_with(expected) 652 653 def test_positive_returncode(self): 654 self._test_returncode_helper(1, 1) 655 656 def test_negative_returncode(self): 657 self._test_returncode_helper(-1, 129) 658 659 660class DaemonCleanupException(Exception): 661 pass 662 663 664class DaemonCleanupTestCase(testtools.TestCase): 665 666 @mock.patch('os.chmod') 667 @mock.patch('shutil.rmtree') 668 @mock.patch('tempfile.mkdtemp') 669 @mock.patch('multiprocessing.managers.BaseManager.get_server', 670 side_effect=DaemonCleanupException) 671 def test_daemon_no_cleanup_for_uninitialized_server(self, gs, mkd, *args): 672 mkd.return_value = '/just_dir/123' 673 self.assertRaises(DaemonCleanupException, daemon.daemon_start, 674 config=None, filters=None) 675